error check for overflow
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #if CMK_DIRECT
45 #include "cmidirect.h"
46 #endif
47
48 #define     LARGEPAGE              0
49
50 #if CMK_SMP
51 #define MULTI_THREAD_SEND          0
52 #define COMM_THREAD_SEND           1
53 #endif
54
55 #if MULTI_THREAD_SEND
56 #define CMK_WORKER_SINGLE_TASK     0
57 #endif
58
59 #define REMOTE_EVENT               0
60 #define CQWRITE                    0
61
62 #define CMI_EXERT_SEND_CAP      0
63 #define CMI_EXERT_RECV_CAP      0
64
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 16
67 #endif
68
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
72
73 #define USE_LRTS_MEMPOOL                  1
74
75 #define PRINT_SYH                         0
76
77 // Trace communication thread
78 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
79 #define TRACE_THRESHOLD     0.00005
80 #define CMI_MPI_TRACE_MOREDETAILED 0
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_SMP_TRACE_COMMTHREAD
85 #define CMK_SMP_TRACE_COMMTHREAD 0
86 #endif
87
88 #define CMK_TRACE_COMMOVERHEAD 0
89 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_TRACE_COMMOVERHEAD
94 #define CMK_TRACE_COMMOVERHEAD 0
95 #endif
96
97 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
98 CpvStaticDeclare(double, projTraceStart);
99 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
100 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
101 #else
102 #define  START_EVENT()
103 #define  END_EVENT(x)
104 #endif
105
106 #if USE_LRTS_MEMPOOL
107
108 #define oneMB (1024ll*1024)
109 #define oneGB (1024ll*1024*1024)
110
111 static CmiInt8 _mempool_size = 8*oneMB;
112 static CmiInt8 _expand_mem =  4*oneMB;
113 static CmiInt8 _mempool_size_limit = 0;
114
115 static CmiInt8 _totalmem = 0.8*oneGB;
116
117 #if LARGEPAGE
118 static CmiInt8 BIG_MSG  =  16*oneMB;
119 static CmiInt8 ONE_SEG  =  4*oneMB;
120 #else
121 static CmiInt8 BIG_MSG  =  4*oneMB;
122 static CmiInt8 ONE_SEG  =  2*oneMB;
123 #endif
124 #if MULTI_THREAD_SEND
125 static int BIG_MSG_PIPELINE = 1;
126 #else
127 static int BIG_MSG_PIPELINE = 4;
128 #endif
129
130 // dynamic flow control
131 static CmiInt8 buffered_send_msg = 0;
132 static CmiInt8 register_memory_size = 0;
133
134 #if LARGEPAGE
135 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
136 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
137 static CmiInt8  register_count = 0;
138 #else
139 #if CMK_SMP && COMM_THREAD_SEND 
140 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
141 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
142 #else
143 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
145 #endif
146
147
148 #endif
149
150 #endif     /* end USE_LRTS_MEMPOOL */
151
152 #if MULTI_THREAD_SEND 
153 #define     CMI_GNI_LOCK(x)       CmiLock(x);
154 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
155 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
156 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
157 #else
158 #define     CMI_GNI_LOCK(x)
159 #define     CMI_GNI_UNLOCK(x)
160 #define     CMI_PCQUEUEPOP_LOCK(Q)   
161 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
162 #endif
163
164 static int _tlbpagesize = 4096;
165
166 //static int _smpd_count  = 0;
167
168 static int   user_set_flag  = 0;
169
170 static int _checkProgress = 1;             /* check deadlock */
171 static int _detected_hang = 0;
172
173 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
174
175 // dynamic SMSG
176 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
177
178 static int avg_smsg_connection = 32;
179 static int                 *smsg_connected_flag= 0;
180 static gni_smsg_attr_t     **smsg_attr_vector_local;
181 static gni_smsg_attr_t     **smsg_attr_vector_remote;
182 static gni_ep_handle_t     ep_hndl_unbound;
183 static gni_smsg_attr_t     send_smsg_attr;
184 static gni_smsg_attr_t     recv_smsg_attr;
185
186 typedef struct _dynamic_smsg_mailbox{
187    void     *mailbox_base;
188    int      size;
189    int      offset;
190    gni_mem_handle_t  mem_hndl;
191    struct      _dynamic_smsg_mailbox  *next;
192 }dynamic_smsg_mailbox_t;
193
194 static dynamic_smsg_mailbox_t  *mailbox_list;
195
196 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
197 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
198
199 #if PRINT_SYH
200 int         lrts_send_msg_id = 0;
201 int         lrts_local_done_msg = 0;
202 int         lrts_send_rdma_success = 0;
203 #endif
204
205 #include "machine.h"
206
207 #include "pcqueue.h"
208
209 #include "mempool.h"
210
211 #if CMK_PERSISTENT_COMM
212 #include "machine-persistent.h"
213 #endif
214
215 //#define  USE_ONESIDED 1
216 #ifdef USE_ONESIDED
217 //onesided implementation is wrong, since no place to restore omdh
218 #include "onesided.h"
219 onesided_hnd_t   onesided_hnd;
220 onesided_md_t    omdh;
221 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
222
223 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
224
225 #else
226 uint8_t   onesided_hnd, omdh;
227
228 #if REMOTE_EVENT || CQWRITE 
229 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
230     if(register_memory_size+size>= MAX_REG_MEM) { \
231         status = GNI_RC_ERROR_NOMEM;} \
232     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
233         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
234 #else
235 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
236         if (register_memory_size + size >= MAX_REG_MEM) { \
237             status = GNI_RC_ERROR_NOMEM; \
238         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
239             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
240 #endif
241
242 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
243     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
244              register_memory_size -= size; \
245          else CmiAbort("MEM_DEregister");  \
246     } while (0)
247 #endif
248
249 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
250 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
251 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
252 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
253 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
254 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
255 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
256 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
257 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
258 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
259 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
260 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
261 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
262 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
263
264 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
265 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
266
267 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
268 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
269 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
270 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
271
272 #define ALIGNBUF                64
273
274 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
275 /* If SMSG is not used */
276
277 #define FMA_PER_CORE  1024
278 #define FMA_BUFFER_SIZE 1024
279
280 /* If SMSG is used */
281 static int  SMSG_MAX_MSG = 1024;
282 #define SMSG_MAX_CREDIT 72 
283
284 #define MSGQ_MAXSIZE       2048
285 /* large message transfer with FMA or BTE */
286 #define LRTS_GNI_RDMA_THRESHOLD  1024 
287
288 #if CMK_SMP
289 static int  REMOTE_QUEUE_ENTRIES=163840; 
290 static int LOCAL_QUEUE_ENTRIES=163840; 
291 #else
292 static int  REMOTE_QUEUE_ENTRIES=20480;
293 static int LOCAL_QUEUE_ENTRIES=20480; 
294 #endif
295
296 #define BIG_MSG_TAG             0x26
297 #define PUT_DONE_TAG            0x28
298 #define DIRECT_PUT_DONE_TAG     0x29
299 #define ACK_TAG                 0x30
300 /* SMSG is data message */
301 #define SMALL_DATA_TAG          0x31
302 #define SMALL_DATA_ACK_TAG      0x32
303 /* SMSG is a control message to initialize a BTE */
304 #define LMSG_INIT_TAG           0x39 
305 #define LMSG_INIT_ACK_TAG       0x3a 
306
307 #define DEBUG
308 #ifdef GNI_RC_CHECK
309 #undef GNI_RC_CHECK
310 #endif
311 #ifdef DEBUG
312 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
313 #else
314 #define GNI_RC_CHECK(msg,rc)
315 #endif
316
317 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
318 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
319 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
320
321 static int useStaticMSGQ = 0;
322 static int useStaticFMA = 0;
323 static int mysize, myrank;
324 static gni_nic_handle_t   nic_hndl;
325
326 typedef struct {
327     gni_mem_handle_t mdh;
328     uint64_t addr;
329 } mdh_addr_t ;
330 // this is related to dynamic SMSG
331
332 typedef struct mdh_addr_list{
333     gni_mem_handle_t mdh;
334    void *addr;
335     struct mdh_addr_list *next;
336 }mdh_addr_list_t;
337
338 static unsigned int         smsg_memlen;
339 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
340 mdh_addr_t          setup_mem;
341 mdh_addr_t          *smsg_connection_vec = 0;
342 gni_mem_handle_t    smsg_connection_memhndl;
343 static int          smsg_expand_slots = 10;
344 static int          smsg_available_slot = 0;
345 static void         *smsg_mailbox_mempool = 0;
346 mdh_addr_list_t     *smsg_dynamic_list = 0;
347
348 static void             *smsg_mailbox_base;
349 gni_msgq_attr_t         msgq_attrs;
350 gni_msgq_handle_t       msgq_handle;
351 gni_msgq_ep_attr_t      msgq_ep_attrs;
352 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
353
354 /* =====Beginning of Declarations of Machine Specific Variables===== */
355 static int cookie;
356 static int modes = 0;
357 static gni_cq_handle_t       smsg_rx_cqh = NULL;
358 static gni_cq_handle_t       default_tx_cqh = NULL;
359 static gni_cq_handle_t       rdma_tx_cqh = NULL;
360 static gni_cq_handle_t       rdma_rx_cqh = NULL;
361 static gni_cq_handle_t       post_tx_cqh = NULL;
362 static gni_ep_handle_t       *ep_hndl_array;
363
364 static CmiNodeLock           *ep_lock_array;
365 static CmiNodeLock           default_tx_cq_lock; 
366 static CmiNodeLock           rdma_tx_cq_lock; 
367 static CmiNodeLock           global_gni_lock; 
368 static CmiNodeLock           rx_cq_lock;
369 static CmiNodeLock           smsg_mailbox_lock;
370 static CmiNodeLock           smsg_rx_cq_lock;
371 static CmiNodeLock           *mempool_lock;
372
373 typedef struct msg_list
374 {
375     uint32_t destNode;
376     uint32_t size;
377     void *msg;
378     uint8_t tag;
379 #if !CMK_SMP
380     struct msg_list *next;
381 #endif
382 }MSG_LIST;
383
384
385 typedef struct control_msg
386 {
387     uint64_t            source_addr;    /* address from the start of buffer  */
388     uint64_t            dest_addr;      /* address from the start of buffer */
389     int                 total_length;   /* total length */
390     int                 length;         /* length of this packet */
391 #if REMOTE_EVENT
392     int                 ack_index;      /* index from integer to address */
393 #endif
394     uint8_t             seq_id;         //big message   0 meaning single message
395     gni_mem_handle_t    source_mem_hndl;
396     struct control_msg *next;
397 } CONTROL_MSG;
398
399 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
400
401 typedef struct ack_msg
402 {
403     uint64_t            source_addr;    /* address from the start of buffer  */
404 #if ! USE_LRTS_MEMPOOL
405     gni_mem_handle_t    source_mem_hndl;
406     int                 length;          /* total length */
407 #endif
408     struct ack_msg     *next;
409 } ACK_MSG;
410
411 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
412
413 #if CMK_DIRECT
414 typedef struct{
415     uint64_t    handler_addr;
416 }CMK_DIRECT_HEADER;
417
418 typedef struct {
419     char core[CmiMsgHeaderSizeBytes];
420     uint64_t handler;
421 }cmidirectMsg;
422
423 //SYH
424 CpvDeclare(int, CmiHandleDirectIdx);
425 void CmiHandleDirectMsg(cmidirectMsg* msg)
426 {
427
428     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
429    (*(_handle->callbackFnPtr))(_handle->callbackData);
430    CmiFree(msg);
431 }
432
433 void CmiDirectInit()
434 {
435     CpvInitialize(int,  CmiHandleDirectIdx);
436     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
437 }
438
439 #endif
440 typedef struct  rmda_msg
441 {
442     int                   destNode;
443 #if REMOTE_EVENT
444     int                   ack_index;
445 #endif
446     gni_post_descriptor_t *pd;
447 #if !CMK_SMP
448     struct  rmda_msg      *next;
449 #endif
450 }RDMA_REQUEST;
451
452
453 #if CMK_SMP
454 #define SMP_LOCKS               0
455 #define ONE_SEND_QUEUE                  0
456 PCQueue sendRdmaBuf;
457 typedef struct  msg_list_index
458 {
459     PCQueue     sendSmsgBuf;
460     int         pushed;
461     CmiNodeLock   lock;
462 } MSG_LIST_INDEX;
463 char                *destpe_avail;
464 #if  !ONE_SEND_QUEUE && SMP_LOCKS
465     PCQueue     nonEmptyQueues;
466 #endif
467 #else         /* non-smp */
468
469 static RDMA_REQUEST        *sendRdmaBuf = 0;
470 static RDMA_REQUEST        *sendRdmaTail = 0;
471 typedef struct  msg_list_index
472 {
473     int         next;
474     MSG_LIST    *sendSmsgBuf;
475     MSG_LIST    *tail;
476 } MSG_LIST_INDEX;
477
478 #endif
479
480 // buffered send queue
481 #if ! ONE_SEND_QUEUE
482 typedef struct smsg_queue
483 {
484     MSG_LIST_INDEX   *smsg_msglist_index;
485     int               smsg_head_index;
486 } SMSG_QUEUE;
487 #else
488 typedef struct smsg_queue
489 {
490     PCQueue       sendMsgBuf;
491 }  SMSG_QUEUE;
492 #endif
493
494 SMSG_QUEUE                  smsg_queue;
495 #if CMK_USE_OOB
496 SMSG_QUEUE                  smsg_oob_queue;
497 #endif
498
499 #if CMK_SMP
500
501 #define FreeMsgList(d)   free(d);
502 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
503
504 #else
505
506 static MSG_LIST       *msglist_freelist=0;
507
508 #define FreeMsgList(d)  \
509   do { \
510   (d)->next = msglist_freelist;\
511   msglist_freelist = d; \
512   } while (0)
513
514 #define MallocMsgList(d) \
515   do {  \
516   d = msglist_freelist;\
517   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
518              _MEMCHECK(d);\
519   } else msglist_freelist = d->next; \
520   d->next =0;  \
521   } while (0)
522
523 #endif
524
525 #if CMK_SMP
526
527 #define FreeControlMsg(d)      free(d);
528 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
529
530 #else
531
532 static CONTROL_MSG    *control_freelist=0;
533
534 #define FreeControlMsg(d)       \
535   do { \
536   (d)->next = control_freelist;\
537   control_freelist = d; \
538   } while (0);
539
540 #define MallocControlMsg(d) \
541   do {  \
542   d = control_freelist;\
543   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
544              _MEMCHECK(d);\
545   } else control_freelist = d->next;  \
546   } while (0);
547
548 #endif
549
550 #if CMK_SMP
551
552 #define FreeAckMsg(d)      free(d);
553 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
554
555 #else
556
557 static ACK_MSG        *ack_freelist=0;
558
559 #define FreeAckMsg(d)       \
560   do { \
561   (d)->next = ack_freelist;\
562   ack_freelist = d; \
563   } while (0)
564
565 #define MallocAckMsg(d) \
566   do { \
567   d = ack_freelist;\
568   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
569              _MEMCHECK(d);\
570   } else ack_freelist = d->next; \
571   } while (0)
572
573 #endif
574
575
576 # if CMK_SMP
577 #define FreeRdmaRequest(d)       free(d);
578 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
579 #else
580
581 static RDMA_REQUEST         *rdma_freelist = NULL;
582
583 #define FreeRdmaRequest(d)       \
584   do {  \
585   (d)->next = rdma_freelist;\
586   rdma_freelist = d;    \
587   } while (0)
588
589 #define MallocRdmaRequest(d) \
590   do {   \
591   d = rdma_freelist;\
592   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
593              _MEMCHECK(d);\
594   } else rdma_freelist = d->next; \
595   d->next =0;   \
596   } while (0)
597 #endif
598
599 /* reuse gni_post_descriptor_t */
600 static gni_post_descriptor_t *post_freelist=0;
601
602 #if  !CMK_SMP
603 #define FreePostDesc(d)       \
604     (d)->next_descr = post_freelist;\
605     post_freelist = d;
606
607 #define MallocPostDesc(d) \
608   d = post_freelist;\
609   if (d==0) { \
610      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
611      d->next_descr = 0;\
612       _MEMCHECK(d);\
613   } else post_freelist = d->next_descr;
614 #else
615
616 #define FreePostDesc(d)     free(d);
617 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
618
619 #endif
620
621
622 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
623 static int      buffered_smsg_counter = 0;
624
625 /* SmsgSend return success but message sent is not confirmed by remote side */
626 static MSG_LIST *buffered_fma_head = 0;
627 static MSG_LIST *buffered_fma_tail = 0;
628
629 /* functions  */
630 #define IsFree(a,ind)  !( a& (1<<(ind) ))
631 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
632 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
633
634 CpvDeclare(mempool_type*, mempool);
635
636 #if REMOTE_EVENT
637 /* ack pool for remote events */
638
639 #define ACK_SHIFT                  19
640 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
641 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
642 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
643
644 struct AckPool {
645 void *addr;
646 int next;
647 };
648
649 static struct AckPool   *ackpool;
650 static int    ackpoolsize;
651 static int    ackpool_freehead;
652 static CmiNodeLock  ackpool_lock;
653
654 #define  GetAckAddress(s)          (ackpool[s].addr)
655
656 static void AckPool_init()
657 {
658     int i;
659     if ((1<<ACK_SHIFT) < mysize) 
660         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
661     ackpoolsize = 2048;
662     ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
663     for (i=0; i<ackpoolsize-1; i++) {
664         ackpool[i].next = i+1;
665     }
666     ackpool[i].next = -1;
667     ackpool_freehead = 0;
668 #if MULTI_THREAD_SEND 
669     ackpool_lock  = CmiCreateLock();
670 #endif
671 }
672
673 static
674 inline int AckPool_getslot(void *addr)
675 {
676     int i;
677     int s;
678     CMI_GNI_LOCK(ackpool_lock);
679     s = ackpool_freehead;
680     if (s == -1) {
681         int newsize = ackpoolsize * 2;
682         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
683         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
684         struct AckPool *old_ackpool = ackpool;
685         ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
686         memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
687         for (i=ackpoolsize; i<newsize-1; i++) {
688             ackpool[i].next = i+1;
689         }
690         ackpool[i].next = -1;
691         ackpool_freehead = ackpoolsize;
692         s = ackpoolsize;
693         ackpoolsize = newsize;
694         free(old_ackpool);
695     }
696     ackpool_freehead = ackpool[s].next;
697     ackpool[s].addr = addr;
698     CMI_GNI_UNLOCK(ackpool_lock);
699     return s;
700 }
701
702 static
703 inline  void AckPool_freeslot(int s)
704 {
705     CmiAssert(s>=0 && s<ackpoolsize);
706     CMI_GNI_LOCK(ackpool_lock);
707     ackpool[s].next = ackpool_freehead;
708     ackpool_freehead = s;
709     CMI_GNI_UNLOCK(ackpool_lock);
710 }
711
712
713 #endif
714
715 #if CMK_WITH_STATS
716 typedef struct comm_thread_stats
717 {
718 int      count_in_send_buffered_ack;
719 double   time_in_send_buffered_ack;
720 double   max_time_in_send_buffered_ack;
721 int      count_in_send_buffered_smsg;
722 double   time_in_send_buffered_smsg;
723 double   max_time_in_send_buffered_smsg;
724 } Comm_Thread_Stats;
725
726 static Comm_Thread_Stats   comm_stats;
727
728 static void init_comm_stats()
729 {
730   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
731 }
732
733 #define STATS_ACK_TIME(x)   \
734         { double t = CmiWallTimer(); \
735           x;        \
736           t = CmiWallTimer() - t;          \
737           comm_stats.count_in_send_buffered_ack ++;        \
738           comm_stats.time_in_send_buffered_ack += t;   \
739           if (t>comm_stats.max_time_in_send_buffered_ack)      \
740               comm_stats.max_time_in_send_buffered_ack = t;    \
741         }
742
743 #define STATS_SEND_SMSGS_TIME(x)   \
744         { double t = CmiWallTimer(); \
745           x;        \
746           t = CmiWallTimer() - t;          \
747           comm_stats.count_in_send_buffered_smsg ++;        \
748           comm_stats.time_in_send_buffered_smsg += t;   \
749           if (t>comm_stats.max_time_in_send_buffered_smsg)      \
750               comm_stats.max_time_in_send_buffered_smsg = t;    \
751         }
752
753 static void print_comm_stats()
754 {
755     if (myrank == 0) 
756     printf("PE[%d]                    count\ttime\tmax \n", myrank);
757     printf("PE[%d]  send buffered ack:   %d\t%f\t%f\n",  myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack, comm_stats.max_time_in_send_buffered_ack);
758     printf("PE[%d]  send smsgs:          %d\t%f\t%f\n",  myrank, comm_stats.count_in_send_buffered_smsg, comm_stats.time_in_send_buffered_smsg, comm_stats.max_time_in_send_buffered_smsg);
759 }
760 #else
761 #define STATS_ACK_TIME(x)            x
762 #define STATS_SEND_SMSGS_TIME(x)     x
763 #endif
764
765 static int print_stats = 0;
766
767 static void
768 allgather(void *in,void *out, int len)
769 {
770     static int *ivec_ptr=NULL,already_called=0,job_size=0;
771     int i,rc;
772     int my_rank;
773     char *tmp_buf,*out_ptr;
774
775     if(!already_called) {
776
777         rc = PMI_Get_size(&job_size);
778         CmiAssert(rc == PMI_SUCCESS);
779         rc = PMI_Get_rank(&my_rank);
780         CmiAssert(rc == PMI_SUCCESS);
781
782         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
783         CmiAssert(ivec_ptr != NULL);
784
785         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
786         CmiAssert(rc == PMI_SUCCESS);
787
788         already_called = 1;
789
790     }
791
792     tmp_buf = (char *)malloc(job_size * len);
793     CmiAssert(tmp_buf);
794
795     rc = PMI_Allgather(in,tmp_buf,len);
796     CmiAssert(rc == PMI_SUCCESS);
797
798     out_ptr = out;
799
800     for(i=0;i<job_size;i++) {
801
802         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
803
804     }
805
806     free(tmp_buf);
807 }
808
809 static void
810 allgather_2(void *in,void *out, int len)
811 {
812     //PMI_Allgather is out of order
813     int i,rc, extend_len;
814     int  rank_index;
815     char *out_ptr, *out_ref;
816     char *in2;
817
818     extend_len = sizeof(int) + len;
819     in2 = (char*)malloc(extend_len);
820
821     memcpy(in2, &myrank, sizeof(int));
822     memcpy(in2+sizeof(int), in, len);
823
824     out_ptr = (char*)malloc(mysize*extend_len);
825
826     rc = PMI_Allgather(in2, out_ptr, extend_len);
827     GNI_RC_CHECK("allgather", rc);
828
829     out_ref = out;
830
831     for(i=0;i<mysize;i++) {
832         //rank index 
833         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
834         //copy to the rank index slot
835         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
836     }
837
838     free(out_ptr);
839     free(in2);
840
841 }
842
843 static unsigned int get_gni_nic_address(int device_id)
844 {
845     unsigned int address, cpu_id;
846     gni_return_t status;
847     int i, alps_dev_id=-1,alps_address=-1;
848     char *token, *p_ptr;
849
850     p_ptr = getenv("PMI_GNI_DEV_ID");
851     if (!p_ptr) {
852         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
853        
854         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
855     } else {
856         while ((token = strtok(p_ptr,":")) != NULL) {
857             alps_dev_id = atoi(token);
858             if (alps_dev_id == device_id) {
859                 break;
860             }
861             p_ptr = NULL;
862         }
863         CmiAssert(alps_dev_id != -1);
864         p_ptr = getenv("PMI_GNI_LOC_ADDR");
865         CmiAssert(p_ptr != NULL);
866         i = 0;
867         while ((token = strtok(p_ptr,":")) != NULL) {
868             if (i == alps_dev_id) {
869                 alps_address = atoi(token);
870                 break;
871             }
872             p_ptr = NULL;
873             ++i;
874         }
875         CmiAssert(alps_address != -1);
876         address = alps_address;
877     }
878     return address;
879 }
880
881 static uint8_t get_ptag(void)
882 {
883     char *p_ptr, *token;
884     uint8_t ptag;
885
886     p_ptr = getenv("PMI_GNI_PTAG");
887     CmiAssert(p_ptr != NULL);
888     token = strtok(p_ptr, ":");
889     ptag = (uint8_t)atoi(token);
890     return ptag;
891         
892 }
893
894 static uint32_t get_cookie(void)
895 {
896     uint32_t cookie;
897     char *p_ptr, *token;
898
899     p_ptr = getenv("PMI_GNI_COOKIE");
900     CmiAssert(p_ptr != NULL);
901     token = strtok(p_ptr, ":");
902     cookie = (uint32_t)atoi(token);
903
904     return cookie;
905 }
906
907 #if LARGEPAGE
908
909 /* directly mmap memory from hugetlbfs for large pages */
910
911 #include <sys/stat.h>
912 #include <fcntl.h>
913 #include <sys/mman.h>
914 #include <hugetlbfs.h>
915
916 // size must be _tlbpagesize aligned
917 void *my_get_huge_pages(size_t size)
918 {
919     char filename[512];
920     int fd;
921     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
922     void *ptr = NULL;
923
924     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
925     fd = open(filename, O_RDWR | O_CREAT, mode);
926     if (fd == -1) {
927         CmiAbort("my_get_huge_pages: open filed");
928     }
929     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
930     if (ptr == MAP_FAILED) ptr = NULL;
931 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
932     close(fd);
933     unlink(filename);
934     return ptr;
935 }
936
937 void my_free_huge_pages(void *ptr, int size)
938 {
939 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
940     int ret = munmap(ptr, size);
941     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
942 }
943
944 #endif
945
946 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
947 /* TODO: add any that are related */
948 /* =====End of Definitions of Message-Corruption Related Macros=====*/
949
950
951 #include "machine-lrts.h"
952 #include "machine-common-core.c"
953
954 /* Network progress function is used to poll the network when for
955    messages. This flushes receive buffers on some  implementations*/
956 #if CMK_MACHINE_PROGRESS_DEFINED
957 void CmiMachineProgressImpl() {
958 }
959 #endif
960
961 static int SendBufferMsg(SMSG_QUEUE *queue);
962 static void SendRdmaMsg();
963 static void PumpNetworkSmsg();
964 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
965 #if CQWRITE
966 static void PumpCqWriteTransactions();
967 #endif
968 #if REMOTE_EVENT
969 static void PumpRemoteTransactions();
970 #endif
971
972 #if MACHINE_DEBUG_LOG
973 FILE *debugLog = NULL;
974 static CmiInt8 buffered_recv_msg = 0;
975 int         lrts_smsg_success = 0;
976 int         lrts_received_msg = 0;
977 #endif
978
979 static void sweep_mempool(mempool_type *mptr)
980 {
981     int n = 0;
982     block_header *current = &(mptr->block_head);
983
984     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
985     while( current!= NULL) {
986         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
987         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
988     }
989     printf("[n %d] sweep_mempool slot END.\n", myrank);
990 }
991
992 inline
993 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
994 {
995     block_header *current = *from;
996
997     //while(register_memory_size>= MAX_REG_MEM)
998     //{
999         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1000             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1001
1002         *from = current;
1003         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1004         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1005         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1006     //}
1007     return GNI_RC_SUCCESS;
1008 }
1009
1010 inline 
1011 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1012 {
1013     gni_return_t status = GNI_RC_SUCCESS;
1014     //int size = GetMempoolsize(msg);
1015     //void *blockaddr = GetMempoolBlockPtr(msg);
1016     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1017    
1018     block_header *current = &(mptr->block_head);
1019     while(register_memory_size>= MAX_REG_MEM)
1020     {
1021         status = deregisterMemory(mptr, &current);
1022         if (status != GNI_RC_SUCCESS) break;
1023     }
1024     if(register_memory_size>= MAX_REG_MEM) return status;
1025
1026     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1027     while(1)
1028     {
1029         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1030         if(status == GNI_RC_SUCCESS)
1031         {
1032             break;
1033         }
1034         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1035         {
1036             CmiAbort("Memory registor for mempool fails\n");
1037         }
1038         else
1039         {
1040             status = deregisterMemory(mptr, &current);
1041             if (status != GNI_RC_SUCCESS) break;
1042         }
1043     }; 
1044     return status;
1045 }
1046
1047 inline 
1048 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1049 {
1050     static int rank = -1;
1051     int i;
1052     gni_return_t status;
1053     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1054     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1055     mempool_type *mptr;
1056
1057     status = registerFromMempool(mptr1, msg, size, t, cqh);
1058     if (status == GNI_RC_SUCCESS) return status;
1059 #if CMK_SMP 
1060     for (i=0; i<CmiMyNodeSize()+1; i++) {
1061       rank = (rank+1)%(CmiMyNodeSize()+1);
1062       mptr = CpvAccessOther(mempool, rank);
1063       if (mptr == mptr1) continue;
1064       status = registerFromMempool(mptr, msg, size, t, cqh);
1065       if (status == GNI_RC_SUCCESS) return status;
1066     }
1067 #endif
1068     return  GNI_RC_ERROR_RESOURCE;
1069 }
1070
1071 inline
1072 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1073 {
1074     MSG_LIST        *msg_tmp;
1075     MallocMsgList(msg_tmp);
1076     msg_tmp->destNode = destNode;
1077     msg_tmp->size   = size;
1078     msg_tmp->msg    = msg;
1079     msg_tmp->tag    = tag;
1080
1081 #if !CMK_SMP
1082     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1083         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1084         queue->smsg_head_index = destNode;
1085         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1086     }else
1087     {
1088         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1089         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1090     }
1091 #else
1092 #if ONE_SEND_QUEUE
1093     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1094 #else
1095 #if SMP_LOCKS
1096     CmiLock(queue->smsg_msglist_index[destNode].lock);
1097     if(queue->smsg_msglist_index[destNode].pushed == 0)
1098     {
1099         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1100     }
1101     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1102     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1103 #else
1104     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1105 #endif
1106 #endif
1107 #endif
1108 #if PRINT_SYH
1109     buffered_smsg_counter++;
1110 #endif
1111 }
1112
1113 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1114 {
1115     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1116 }
1117
1118 inline
1119 static void setup_smsg_connection(int destNode)
1120 {
1121     mdh_addr_list_t  *new_entry = 0;
1122     gni_post_descriptor_t *pd;
1123     gni_smsg_attr_t      *smsg_attr;
1124     gni_return_t status = GNI_RC_NOT_DONE;
1125     RDMA_REQUEST        *rdma_request_msg;
1126     
1127     if(smsg_available_slot == smsg_expand_slots)
1128     {
1129         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1130         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1131         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1132
1133         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1134             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1135             GNI_MEM_READWRITE,   
1136             -1,
1137             &(new_entry->mdh));
1138         smsg_available_slot = 0; 
1139         new_entry->next = smsg_dynamic_list;
1140         smsg_dynamic_list = new_entry;
1141     }
1142     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1143     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1144     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1145     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1146     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1147     smsg_attr->buff_size = smsg_memlen;
1148     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1149     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1150     smsg_local_attr_vec[destNode] = smsg_attr;
1151     smsg_available_slot++;
1152     MallocPostDesc(pd);
1153     pd->type            = GNI_POST_FMA_PUT;
1154     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1155     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1156     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1157     pd->length          = sizeof(gni_smsg_attr_t);
1158     pd->local_addr      = (uint64_t) smsg_attr;
1159     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1160     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1161     pd->src_cq_hndl     = rdma_tx_cqh;
1162     pd->rdma_mode       = 0;
1163     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1164     print_smsg_attr(smsg_attr);
1165     if(status == GNI_RC_ERROR_RESOURCE )
1166     {
1167         MallocRdmaRequest(rdma_request_msg);
1168         rdma_request_msg->destNode = destNode;
1169         rdma_request_msg->pd = pd;
1170         /* buffer this request */
1171     }
1172 #if PRINT_SYH
1173     if(status != GNI_RC_SUCCESS)
1174        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1175     else
1176         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1177 #endif
1178 }
1179
1180 /* useDynamicSMSG */
1181 inline 
1182 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1183 {
1184     gni_return_t status = GNI_RC_NOT_DONE;
1185
1186     if(mailbox_list->offset == mailbox_list->size)
1187     {
1188         dynamic_smsg_mailbox_t *new_mailbox_entry;
1189         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1190         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1191         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1192         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1193         new_mailbox_entry->offset = 0;
1194         
1195         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1196             new_mailbox_entry->size, smsg_rx_cqh,
1197             GNI_MEM_READWRITE,   
1198             -1,
1199             &(new_mailbox_entry->mem_hndl));
1200
1201         GNI_RC_CHECK("register", status);
1202         new_mailbox_entry->next = mailbox_list;
1203         mailbox_list = new_mailbox_entry;
1204     }
1205     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1206     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1207     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1208     local_smsg_attr->mbox_offset = mailbox_list->offset;
1209     mailbox_list->offset += smsg_memlen;
1210     local_smsg_attr->buff_size = smsg_memlen;
1211     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1212     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1213 }
1214
1215 /* useDynamicSMSG */
1216 inline 
1217 static int connect_to(int destNode)
1218 {
1219     gni_return_t status = GNI_RC_NOT_DONE;
1220     CmiAssert(smsg_connected_flag[destNode] == 0);
1221     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1222     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1223     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1224     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1225     
1226     CMI_GNI_LOCK(global_gni_lock)
1227     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1228     CMI_GNI_UNLOCK(global_gni_lock)
1229     if (status == GNI_RC_ERROR_RESOURCE) {
1230       /* possibly destNode is making connection at the same time */
1231       free(smsg_attr_vector_local[destNode]);
1232       smsg_attr_vector_local[destNode] = NULL;
1233       free(smsg_attr_vector_remote[destNode]);
1234       smsg_attr_vector_remote[destNode] = NULL;
1235       mailbox_list->offset -= smsg_memlen;
1236       return 0;
1237     }
1238     GNI_RC_CHECK("GNI_Post", status);
1239     smsg_connected_flag[destNode] = 1;
1240     return 1;
1241 }
1242
1243 inline 
1244 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1245 {
1246     unsigned int          remote_address;
1247     uint32_t              remote_id;
1248     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1249     gni_smsg_attr_t       *smsg_attr;
1250     gni_post_descriptor_t *pd;
1251     gni_post_state_t      post_state;
1252     char                  *real_data; 
1253
1254     if (useDynamicSMSG) {
1255         switch (smsg_connected_flag[destNode]) {
1256         case 0: 
1257             connect_to(destNode);         /* continue to case 1 */
1258         case 1:                           /* pending connection, do nothing */
1259             status = GNI_RC_NOT_DONE;
1260             if(inbuff ==0)
1261                 buffer_small_msgs(queue, msg, size, destNode, tag);
1262             return status;
1263         }
1264     }
1265 #if CMK_SMP
1266 #if ! ONE_SEND_QUEUE
1267     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1268 #endif
1269     {
1270 #else
1271     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1272     {
1273 #endif
1274         uint64_t *buf = NULL;
1275         int bufsize = 0;
1276         //CMI_GNI_LOCK(smsg_mailbox_lock)
1277         CMI_GNI_LOCK(default_tx_cq_lock)
1278 #if CMK_SMP_TRACE_COMMTHREAD
1279         int oldpe = -1;
1280         int oldeventid = -1;
1281         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1282         { 
1283             START_EVENT();
1284             if ( tag == SMALL_DATA_TAG)
1285                 real_data = (char*)msg; 
1286             else 
1287                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1288             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1289             TRACE_COMM_SET_COMM_MSGID(real_data);
1290         }
1291 #endif
1292 #if REMOTE_EVENT
1293         if (tag == LMSG_INIT_TAG) {
1294             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1295             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1296                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1297         }
1298         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1299 #endif
1300         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1301 #if CMK_SMP_TRACE_COMMTHREAD
1302         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1303 #endif
1304         CMI_GNI_UNLOCK(default_tx_cq_lock)
1305         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1306         if(status == GNI_RC_SUCCESS)
1307         {
1308 #if CMK_SMP_TRACE_COMMTHREAD
1309             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG || tag == LMSG_INIT_ACK_TAG)
1310             { 
1311                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1312             }
1313 #endif
1314             smsg_send_count ++;
1315         }else
1316             status = GNI_RC_ERROR_RESOURCE;
1317     }
1318     if(status != GNI_RC_SUCCESS && inbuff ==0)
1319         buffer_small_msgs(queue, msg, size, destNode, tag);
1320     return status;
1321 }
1322
1323 inline 
1324 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1325 {
1326     /* construct a control message and send */
1327     CONTROL_MSG         *control_msg_tmp;
1328     MallocControlMsg(control_msg_tmp);
1329     control_msg_tmp->source_addr = (uint64_t)msg;
1330     control_msg_tmp->seq_id    = seqno;
1331     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1332 #if REMOTE_EVENT
1333     control_msg_tmp->ack_index    =  -1;
1334 #endif
1335 #if     USE_LRTS_MEMPOOL
1336     if(size < BIG_MSG)
1337     {
1338         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1339     }
1340     else
1341     {
1342         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1343         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1344         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1345     }
1346 #else
1347     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1348 #endif
1349     return control_msg_tmp;
1350 }
1351
1352 #define BLOCKING_SEND_CONTROL    0
1353
1354 // Large message, send control to receiver, receiver register memory and do a GET, 
1355 // return 1 - send no success
1356 inline
1357 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1358 {
1359     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1360     uint32_t            vmdh_index  = -1;
1361     int                 size;
1362     int                 offset = 0;
1363     uint64_t            source_addr;
1364     int                 register_size; 
1365     void                *msg;
1366
1367     size    =   control_msg_tmp->total_length;
1368     source_addr = control_msg_tmp->source_addr;
1369     register_size = control_msg_tmp->length;
1370
1371 #if  USE_LRTS_MEMPOOL
1372     if( control_msg_tmp->seq_id == 0 ){
1373 #if BLOCKING_SEND_CONTROL
1374         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1375             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1376                 LrtsAdvanceCommunication(0);
1377         }
1378 #endif
1379         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1380         {
1381             msg = (void*)source_addr;
1382             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1383             {
1384                 if(!inbuff)
1385                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1386                 return GNI_RC_ERROR_NOMEM;
1387             }
1388             //register the corresponding mempool
1389             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1390             if(status == GNI_RC_SUCCESS)
1391             {
1392                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1393             }
1394         }else
1395         {
1396             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1397             status = GNI_RC_SUCCESS;
1398         }
1399         if(NoMsgInSend(source_addr))
1400             register_size = GetMempoolsize((void*)(source_addr));
1401         else
1402             register_size = 0;
1403     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1404     {
1405         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1406         source_addr += offset;
1407         size = control_msg_tmp->length;
1408 #if BLOCKING_SEND_CONTROL
1409         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1410             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1411                 LrtsAdvanceCommunication(0);
1412         }
1413 #endif
1414         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1415             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1416             {
1417                 if(!inbuff)
1418                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1419                 return GNI_RC_ERROR_NOMEM;
1420             }
1421             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1422             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1423         }
1424         else
1425         {
1426             status = GNI_RC_SUCCESS;
1427         }
1428         register_size = 0;  
1429     }
1430
1431     if(status == GNI_RC_SUCCESS)
1432     {
1433         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1434         if(status == GNI_RC_SUCCESS)
1435         {
1436             buffered_send_msg += register_size;
1437             if(control_msg_tmp->seq_id == 0)
1438             {
1439                 IncreaseMsgInSend(source_addr);
1440             }
1441             FreeControlMsg(control_msg_tmp);
1442             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1443         }else
1444             status = GNI_RC_ERROR_RESOURCE;
1445
1446     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1447     {
1448         CmiAbort("Memory registor for large msg\n");
1449     }else 
1450     {
1451         status = GNI_RC_ERROR_NOMEM; 
1452         if(!inbuff)
1453             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1454     }
1455     return status;
1456 #else
1457     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1458     if(status == GNI_RC_SUCCESS)
1459     {
1460         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1461         if(status == GNI_RC_SUCCESS)
1462         {
1463             FreeControlMsg(control_msg_tmp);
1464         }
1465     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1466     {
1467         CmiAbort("Memory registor for large msg\n");
1468     }else 
1469     {
1470         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1471     }
1472     return status;
1473 #endif
1474 }
1475
1476 inline void LrtsPrepareEnvelope(char *msg, int size)
1477 {
1478     CmiSetMsgSize(msg, size);
1479 }
1480
1481 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1482 {
1483     gni_return_t        status  =   GNI_RC_SUCCESS;
1484     uint8_t tag;
1485     CONTROL_MSG         *control_msg_tmp;
1486     int                 oob = ( mode & OUT_OF_BAND);
1487     SMSG_QUEUE          *queue;
1488
1489     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1490 #if CMK_USE_OOB
1491     queue = oob? &smsg_oob_queue : &smsg_queue;
1492 #else
1493     queue = &smsg_queue;
1494 #endif
1495
1496     LrtsPrepareEnvelope(msg, size);
1497
1498 #if PRINT_SYH
1499     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1500 #endif 
1501 #if CMK_SMP 
1502     if(size <= SMSG_MAX_MSG)
1503         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1504     else if (size < BIG_MSG) {
1505         control_msg_tmp =  construct_control_msg(size, msg, 0);
1506         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1507     }
1508     else {
1509           CmiSetMsgSeq(msg, 0);
1510           control_msg_tmp =  construct_control_msg(size, msg, 1);
1511           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1512     }
1513 #else   //non-smp, smp(worker sending)
1514     if(size <= SMSG_MAX_MSG)
1515     {
1516         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1517             CmiFree(msg);
1518     }
1519     else if (size < BIG_MSG) {
1520         control_msg_tmp =  construct_control_msg(size, msg, 0);
1521         send_large_messages(queue, destNode, control_msg_tmp, 0);
1522     }
1523     else {
1524 #if     USE_LRTS_MEMPOOL
1525         CmiSetMsgSeq(msg, 0);
1526         control_msg_tmp =  construct_control_msg(size, msg, 1);
1527         send_large_messages(queue, destNode, control_msg_tmp, 0);
1528 #else
1529         control_msg_tmp =  construct_control_msg(size, msg, 0);
1530         send_large_messages(queue, destNode, control_msg_tmp, 0);
1531 #endif
1532     }
1533 #endif
1534     return 0;
1535 }
1536
1537 static void    PumpDatagramConnection();
1538 static      int         event_SetupConnect = 111;
1539 static      int         event_PumpSmsg = 222 ;
1540 static      int         event_PumpTransaction = 333;
1541 static      int         event_PumpRdmaTransaction = 444;
1542 static      int         event_SendBufferSmsg = 444;
1543 static      int         event_SendFmaRdmaMsg = 555;
1544 static      int         event_AdvanceCommunication = 666;
1545
1546 static void registerUserTraceEvents() {
1547 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1548     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1549     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1550     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1551     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1552     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1553     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1554     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1555 #endif
1556 }
1557
1558 static void ProcessDeadlock()
1559 {
1560     static CmiUInt8 *ptr = NULL;
1561     static CmiUInt8  last = 0, mysum, sum;
1562     static int count = 0;
1563     gni_return_t status;
1564     int i;
1565
1566 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1567 //sweep_mempool(CpvAccess(mempool));
1568     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1569     mysum = smsg_send_count + smsg_recv_count;
1570     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1571     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1572     GNI_RC_CHECK("PMI_Allgather", status);
1573     sum = 0;
1574     for (i=0; i<mysize; i++)  sum+= ptr[i];
1575     if (last == 0 || sum == last) 
1576         count++;
1577     else
1578         count = 0;
1579     last = sum;
1580     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1581     if (count == 2) { 
1582         /* detected twice, it is a real deadlock */
1583         if (myrank == 0)  {
1584             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1585             CmiAbort("Fatal> Deadlock detected.");
1586         }
1587
1588     }
1589     _detected_hang = 0;
1590 }
1591
1592 static void CheckProgress()
1593 {
1594     if (smsg_send_count == last_smsg_send_count &&
1595         smsg_recv_count == last_smsg_recv_count ) 
1596     {
1597         _detected_hang = 1;
1598 #if !CMK_SMP
1599         if (_detected_hang) ProcessDeadlock();
1600 #endif
1601
1602     }
1603     else {
1604         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1605         last_smsg_send_count = smsg_send_count;
1606         last_smsg_recv_count = smsg_recv_count;
1607         _detected_hang = 0;
1608     }
1609 }
1610
1611 static void set_limit()
1612 {
1613     //if (!user_set_flag && CmiMyRank() == 0) {
1614     if (CmiMyRank() == 0) {
1615         int mynode = CmiPhysicalNodeID(CmiMyPe());
1616         int numpes = CmiNumPesOnPhysicalNode(mynode);
1617         int numprocesses = numpes / CmiMyNodeSize();
1618         MAX_REG_MEM  = _totalmem / numprocesses;
1619         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1620         if (CmiMyPe() == 0)
1621            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1622         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1623         {
1624              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1625              CmiAbort("memory registration\n");
1626         }
1627     }
1628 }
1629
1630 void LrtsPostCommonInit(int everReturn)
1631 {
1632 #if CMK_DIRECT
1633     CmiDirectInit();
1634 #endif
1635 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1636     CpvInitialize(double, projTraceStart);
1637     /* only PE 0 needs to care about registration (to generate sts file). */
1638     //if (CmiMyPe() == 0) 
1639     {
1640         registerMachineUserEventsFunction(&registerUserTraceEvents);
1641     }
1642 #endif
1643
1644 #if CMK_SMP
1645     CmiIdleState *s=CmiNotifyGetState();
1646     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1647     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1648 #else
1649     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1650     if (useDynamicSMSG)
1651     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1652 #endif
1653
1654 #if ! LARGEPAGE
1655     if (_checkProgress)
1656 #if CMK_SMP
1657     if (CmiMyRank() == 0)
1658 #endif
1659     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1660 #endif
1661  
1662 #if !LARGEPAGE
1663     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1664 #endif
1665 }
1666
1667 /* this is called by worker thread */
1668 void LrtsPostNonLocal(){
1669 #if CMK_SMP_TRACE_COMMTHREAD
1670     double startT, endT;
1671 #endif
1672 #if MULTI_THREAD_SEND
1673     if(mysize == 1) return;
1674 #if CMK_SMP_TRACE_COMMTHREAD
1675     traceEndIdle();
1676 #endif
1677
1678 #if CMK_SMP_TRACE_COMMTHREAD
1679     startT = CmiWallTimer();
1680 #endif
1681
1682 #if CMK_WORKER_SINGLE_TASK
1683     if (CmiMyRank() % 6 == 0)
1684 #endif
1685     PumpNetworkSmsg();
1686
1687 #if CMK_WORKER_SINGLE_TASK
1688     if (CmiMyRank() % 6 == 1)
1689 #endif
1690     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1691
1692 #if CMK_WORKER_SINGLE_TASK
1693     if (CmiMyRank() % 6 == 2)
1694 #endif
1695     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1696
1697 #if REMOTE_EVENT
1698 #if CMK_WORKER_SINGLE_TASK
1699     if (CmiMyRank() % 6 == 3)
1700 #endif
1701     PumpRemoteTransactions();
1702 #endif
1703
1704 #if CMK_USE_OOB
1705     if (SendBufferMsg(&smsg_oob_queue) == 1)
1706 #endif
1707     {
1708 #if CMK_WORKER_SINGLE_TASK
1709     if (CmiMyRank() % 6 == 4)
1710 #endif
1711         SendBufferMsg(&smsg_queue);
1712     }
1713
1714 #if CMK_WORKER_SINGLE_TASK
1715     if (CmiMyRank() % 6 == 5)
1716 #endif
1717     SendRdmaMsg();
1718
1719 #if CMK_SMP_TRACE_COMMTHREAD
1720     endT = CmiWallTimer();
1721     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1722 #endif
1723 #if CMK_SMP_TRACE_COMMTHREAD
1724     traceBeginIdle();
1725 #endif
1726 #endif
1727 }
1728
1729 /* useDynamicSMSG */
1730 static void    PumpDatagramConnection()
1731 {
1732     uint32_t          remote_address;
1733     uint32_t          remote_id;
1734     gni_return_t status;
1735     gni_post_state_t  post_state;
1736     uint64_t          datagram_id;
1737     int i;
1738
1739    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1740    {
1741        if (datagram_id >= mysize) {           /* bound endpoint */
1742            int pe = datagram_id - mysize;
1743            CMI_GNI_LOCK(global_gni_lock)
1744            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1745            CMI_GNI_UNLOCK(global_gni_lock)
1746            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1747            {
1748                CmiAssert(remote_id == pe);
1749                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1750                GNI_RC_CHECK("Dynamic SMSG Init", status);
1751 #if PRINT_SYH
1752                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1753 #endif
1754                CmiAssert(smsg_connected_flag[pe] == 1);
1755                smsg_connected_flag[pe] = 2;
1756            }
1757        }
1758        else {         /* unbound ep */
1759            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1760            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1761            {
1762                CmiAssert(remote_id<mysize);
1763                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1764                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1765                GNI_RC_CHECK("Dynamic SMSG Init", status);
1766 #if PRINT_SYH
1767                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1768 #endif
1769                smsg_connected_flag[remote_id] = 2;
1770
1771                alloc_smsg_attr(&send_smsg_attr);
1772                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1773                GNI_RC_CHECK("post unbound datagram", status);
1774            }
1775        }
1776    }
1777 }
1778
1779 /* pooling CQ to receive network message */
1780 static void PumpNetworkRdmaMsgs()
1781 {
1782     gni_cq_entry_t      event_data;
1783     gni_return_t        status;
1784
1785 }
1786
1787 inline 
1788 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1789 {
1790     RDMA_REQUEST        *rdma_request_msg;
1791     MallocRdmaRequest(rdma_request_msg);
1792     rdma_request_msg->destNode = inst_id;
1793     rdma_request_msg->pd = pd;
1794 #if REMOTE_EVENT
1795     rdma_request_msg->ack_index = ack_index;
1796 #endif
1797 #if CMK_SMP
1798     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1799 #else
1800     if(sendRdmaBuf == 0)
1801     {
1802         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1803     }else{
1804         sendRdmaTail->next = rdma_request_msg;
1805         sendRdmaTail =  rdma_request_msg;
1806     }
1807 #endif
1808
1809 }
1810
1811 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1812
1813 static void PumpNetworkSmsg()
1814 {
1815     uint64_t            inst_id;
1816     int                 ret;
1817     gni_cq_entry_t      event_data;
1818     gni_return_t        status, status2;
1819     void                *header;
1820     uint8_t             msg_tag;
1821     int                 msg_nbytes;
1822     void                *msg_data;
1823     gni_mem_handle_t    msg_mem_hndl;
1824     gni_smsg_attr_t     *smsg_attr;
1825     gni_smsg_attr_t     *remote_smsg_attr;
1826     int                 init_flag;
1827     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1828     uint64_t            source_addr;
1829     SMSG_QUEUE         *queue = &smsg_queue;
1830 #if     CMK_DIRECT
1831     cmidirectMsg        *direct_msg;
1832 #endif
1833     while(1)
1834     {
1835         CMI_GNI_LOCK(smsg_rx_cq_lock)
1836         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1837         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1838         if(status != GNI_RC_SUCCESS)
1839             break;
1840         inst_id = GNI_CQ_GET_INST_ID(event_data);
1841 #if REMOTE_EVENT
1842         inst_id = ACK_GET_RANK(inst_id);
1843 #endif
1844         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1845 #if PRINT_SYH
1846         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1847 #endif
1848         if (useDynamicSMSG) {
1849             /* subtle: smsg may come before connection is setup */
1850             while (smsg_connected_flag[inst_id] != 2) 
1851                PumpDatagramConnection();
1852         }
1853         msg_tag = GNI_SMSG_ANY_TAG;
1854         while(1) {
1855             CMI_GNI_LOCK(smsg_mailbox_lock)
1856             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1857             if (status != GNI_RC_SUCCESS)
1858             {
1859                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1860                 break;
1861             }
1862 #if PRINT_SYH
1863             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1864 #endif
1865             /* copy msg out and then put into queue (small message) */
1866             switch (msg_tag) {
1867             case SMALL_DATA_TAG:
1868             {
1869                 START_EVENT();
1870                 msg_nbytes = CmiGetMsgSize(header);
1871                 msg_data    = CmiAlloc(msg_nbytes);
1872                 memcpy(msg_data, (char*)header, msg_nbytes);
1873                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1874                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1875                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1876                 handleOneRecvedMsg(msg_nbytes, msg_data);
1877                 break;
1878             }
1879             case LMSG_INIT_TAG:
1880             {
1881 #if MULTI_THREAD_SEND
1882                 MallocControlMsg(control_msg_tmp);
1883                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1884                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1885                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1886                 getLargeMsgRequest(control_msg_tmp, inst_id);
1887                 FreeControlMsg(control_msg_tmp);
1888 #else
1889                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1890                 getLargeMsgRequest(header, inst_id);
1891                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1892 #endif
1893                 break;
1894             }
1895             case ACK_TAG:   //msg fit into mempool
1896             {
1897                 /* Get is done, release message . Now put is not used yet*/
1898                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1899                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1900                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1901 #if ! USE_LRTS_MEMPOOL
1902                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1903 #else
1904                 DecreaseMsgInSend(msg);
1905 #endif
1906                 if(NoMsgInSend(msg))
1907                     buffered_send_msg -= GetMempoolsize(msg);
1908                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1909                 CmiFree(msg);
1910                 break;
1911             }
1912             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1913             {
1914 #if MULTI_THREAD_SEND
1915                 MallocControlMsg(header_tmp);
1916                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1917                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1918 #else
1919                 header_tmp = (CONTROL_MSG *) header;
1920 #endif
1921                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1922                 void *msg = (void*)(header_tmp->source_addr);
1923                 int cur_seq = CmiGetMsgSeq(msg);
1924                 int offset = ONE_SEG*(cur_seq+1);
1925                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1926                 buffered_send_msg -= header_tmp->length;
1927                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1928                 if (remain_size < 0) remain_size = 0;
1929                 CmiSetMsgSize(msg, remain_size);
1930                 if(remain_size <= 0) //transaction done
1931                 {
1932                     CmiFree(msg);
1933                 }else if (header_tmp->total_length > offset)
1934                 {
1935                     CmiSetMsgSeq(msg, cur_seq+1);
1936                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1937                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1938                     //send next seg
1939                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1940                          // pipelining
1941                     if (header_tmp->seq_id == 1) {
1942                       int i;
1943                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1944                         int seq = cur_seq+i+2;
1945                         CmiSetMsgSeq(msg, seq-1);
1946                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1947                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1948                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1949                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1950                       }
1951                     }
1952                 }
1953 #if MULTI_THREAD_SEND
1954                 FreeControlMsg(header_tmp);
1955 #else
1956                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1957 #endif
1958                 break;
1959             }
1960 #if CMK_PERSISTENT_COMM
1961             case PUT_DONE_TAG: //persistent message
1962                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1963                 int size = ((CONTROL_MSG *) header)->length;
1964                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1965                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1966                 CmiReference(msg);
1967                 handleOneRecvedMsg(size, msg); 
1968                 break;
1969 #endif
1970 #if CMK_DIRECT
1971             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1972                 //create a trigger message
1973                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1974                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1975                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1976                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1977                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1978                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1979                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1980                 break;
1981 #endif
1982             default: {
1983                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1984                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1985                 printf("weird tag problem\n");
1986                 CmiAbort("Unknown tag\n");
1987                      }
1988             }               // end switch
1989 #if PRINT_SYH
1990             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1991 #endif
1992             smsg_recv_count ++;
1993             msg_tag = GNI_SMSG_ANY_TAG;
1994         } //endwhile getNext
1995     }   //end while GetEvent
1996     if(status == GNI_RC_ERROR_RESOURCE)
1997     {
1998         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1999         GNI_RC_CHECK("Smsg_rx_cq full", status);
2000     }
2001 }
2002
2003 static void printDesc(gni_post_descriptor_t *pd)
2004 {
2005     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2006 }
2007
2008 #if CQWRITE
2009 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2010 {
2011     gni_post_descriptor_t *pd;
2012     gni_return_t        status = GNI_RC_SUCCESS;
2013     
2014     MallocPostDesc(pd);
2015     pd->type = GNI_POST_CQWRITE;
2016     pd->cq_mode = GNI_CQMODE_SILENT;
2017     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2018     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2019     pd->cqwrite_value = data;
2020     pd->remote_mem_hndl = mem_hndl;
2021     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2022     GNI_RC_CHECK("GNI_PostCqWrite", status);
2023 }
2024 #endif
2025
2026 // for BIG_MSG called on receiver side for receiving control message
2027 // LMSG_INIT_TAG
2028 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2029 {
2030 #if     USE_LRTS_MEMPOOL
2031     CONTROL_MSG         *request_msg;
2032     gni_return_t        status = GNI_RC_SUCCESS;
2033     void                *msg_data;
2034     gni_post_descriptor_t *pd;
2035     gni_mem_handle_t    msg_mem_hndl;
2036     int source, size, transaction_size, offset = 0;
2037     size_t     register_size = 0;
2038
2039     // initial a get to transfer data from the sender side */
2040     request_msg = (CONTROL_MSG *) header;
2041     size = request_msg->total_length;
2042     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2043     MallocPostDesc(pd);
2044     if(request_msg->seq_id < 2)   {
2045         msg_data = CmiAlloc(size);
2046         CmiSetMsgSeq(msg_data, 0);
2047         _MEMCHECK(msg_data);
2048 #if CMK_SMP_TRACE_COMMTHREAD
2049         pd->second_operand = 1000000 * CmiWallTimer(); //microsecond
2050 #endif
2051     }
2052     else {
2053         offset = ONE_SEG*(request_msg->seq_id-1);
2054         msg_data = (char*)request_msg->dest_addr + offset;
2055     }
2056    
2057     pd->cqwrite_value = request_msg->seq_id;
2058     if( request_msg->seq_id == 0)
2059     {
2060         pd->local_mem_hndl= GetMemHndl(msg_data);
2061         transaction_size = ALIGN64(size);
2062         if(IsMemHndlZero(pd->local_mem_hndl))
2063         {   
2064             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2065             if(status == GNI_RC_SUCCESS)
2066             {
2067                 pd->local_mem_hndl = GetMemHndl(msg_data);
2068             }
2069             else
2070             {
2071                 SetMemHndlZero(pd->local_mem_hndl);
2072             }
2073         }
2074         if(NoMsgInRecv( (void*)(msg_data)))
2075             register_size = GetMempoolsize((void*)(msg_data));
2076         else
2077             register_size = 0;
2078     }
2079     else{
2080         transaction_size = ALIGN64(request_msg->length);
2081         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL); 
2082         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2083         {
2084             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2085         }
2086     }
2087     pd->first_operand = ALIGN64(size);                   //  total length
2088
2089     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2090         pd->type            = GNI_POST_FMA_GET;
2091     else
2092         pd->type            = GNI_POST_RDMA_GET;
2093     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2094     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2095     pd->length          = transaction_size;
2096     pd->local_addr      = (uint64_t) msg_data;
2097     pd->remote_addr     = request_msg->source_addr + offset;
2098     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2099     pd->src_cq_hndl     = rdma_tx_cqh;
2100     pd->rdma_mode       = 0;
2101     pd->amo_cmd         = 0;
2102
2103     //memory registration success
2104     if(status == GNI_RC_SUCCESS)
2105     {
2106         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2107         CMI_GNI_LOCK(lock)
2108 #if REMOTE_EVENT
2109         if( request_msg->seq_id == 0)
2110         {
2111             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2112             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2113             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2114         }
2115         else {
2116             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
2117             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2118         }
2119 #endif
2120         if(pd->type == GNI_POST_RDMA_GET) 
2121         {
2122             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2123         }
2124         else
2125         {
2126             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2127         }
2128         CMI_GNI_UNLOCK(lock)
2129
2130         if(status == GNI_RC_SUCCESS )
2131         {
2132             if(pd->cqwrite_value == 0)
2133             {
2134 #if MACHINE_DEBUG_LOG
2135                 buffered_recv_msg += register_size;
2136                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2137 #endif
2138                 IncreaseMsgInRecv(msg_data);
2139             }
2140         }
2141     }else
2142     {
2143         SetMemHndlZero((pd->local_mem_hndl));
2144     }
2145     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2146     {
2147 #if REMOTE_EVENT
2148         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2149 #else
2150         bufferRdmaMsg(inst_id, pd, -1); 
2151 #endif
2152     }else {
2153          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2154         GNI_RC_CHECK("GetLargeAFter posting", status);
2155     }
2156 #else
2157     CONTROL_MSG         *request_msg;
2158     gni_return_t        status;
2159     void                *msg_data;
2160     gni_post_descriptor_t *pd;
2161     RDMA_REQUEST        *rdma_request_msg;
2162     gni_mem_handle_t    msg_mem_hndl;
2163     //int source;
2164     // initial a get to transfer data from the sender side */
2165     request_msg = (CONTROL_MSG *) header;
2166     msg_data = CmiAlloc(request_msg->length);
2167     _MEMCHECK(msg_data);
2168
2169     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2170
2171     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2172     {
2173         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2174     }
2175
2176     MallocPostDesc(pd);
2177     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2178         pd->type            = GNI_POST_FMA_GET;
2179     else
2180         pd->type            = GNI_POST_RDMA_GET;
2181     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2182     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2183     pd->length          = ALIGN64(request_msg->length);
2184     pd->local_addr      = (uint64_t) msg_data;
2185     pd->remote_addr     = request_msg->source_addr;
2186     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2187     pd->src_cq_hndl     = rdma_tx_cqh;
2188     pd->rdma_mode       = 0;
2189     pd->amo_cmd         = 0;
2190
2191     //memory registration successful
2192     if(status == GNI_RC_SUCCESS)
2193     {
2194         pd->local_mem_hndl  = msg_mem_hndl;
2195        
2196         if(pd->type == GNI_POST_RDMA_GET) 
2197         {
2198             CMI_GNI_LOCK(rdma_tx_cq_lock)
2199             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2200             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2201         }
2202         else
2203         {
2204             CMI_GNI_LOCK(default_tx_cq_lock)
2205             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2206             CMI_GNI_UNLOCK(default_tx_cq_lock)
2207         }
2208
2209     }else
2210     {
2211         SetMemHndlZero(pd->local_mem_hndl);
2212     }
2213     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2214     {
2215         MallocRdmaRequest(rdma_request_msg);
2216         rdma_request_msg->next = 0;
2217         rdma_request_msg->destNode = inst_id;
2218         rdma_request_msg->pd = pd;
2219         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2220     }else {
2221         GNI_RC_CHECK("AFter posting", status);
2222     }
2223 #endif
2224 }
2225
2226 #if CQWRITE
2227 static void PumpCqWriteTransactions()
2228 {
2229
2230     gni_cq_entry_t          ev;
2231     gni_return_t            status;
2232     void                    *msg;   
2233     while(1) {
2234         //CMI_GNI_LOCK(my_cq_lock) 
2235         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2236         //CMI_GNI_UNLOCK(my_cq_lock)
2237         if(status != GNI_RC_SUCCESS) break;
2238         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2239
2240         DecreaseMsgInSend(msg);
2241 #if ! USE_LRTS_MEMPOOL
2242        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2243 #else
2244         DecreaseMsgInSend(msg);
2245 #endif
2246         if(NoMsgInSend(msg))
2247             buffered_send_msg -= GetMempoolsize(msg);
2248         CmiFree(msg);
2249     };
2250     if(status == GNI_RC_ERROR_RESOURCE)
2251     {
2252         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2253     }
2254 }
2255 #endif
2256
2257 #if REMOTE_EVENT
2258 static void PumpRemoteTransactions()
2259 {
2260     gni_cq_entry_t          ev;
2261     gni_return_t            status;
2262     void                    *msg;   
2263     int                     slot;
2264
2265     while(1) {
2266         CMI_GNI_LOCK(global_gni_lock)
2267         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2268         CMI_GNI_UNLOCK(global_gni_lock)
2269         if(status != GNI_RC_SUCCESS) {
2270             break;
2271         }
2272
2273         slot = GNI_CQ_GET_INST_ID(ev);
2274         slot = ACK_GET_INDEX(slot);
2275         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2276
2277         //CMI_GNI_LOCK(ackpool_lock);
2278         msg = GetAckAddress(slot);
2279         //CMI_GNI_UNLOCK(ackpool_lock);
2280
2281         DecreaseMsgInSend(msg);
2282 #if ! USE_LRTS_MEMPOOL
2283        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2284 #else
2285         DecreaseMsgInSend(msg);
2286 #endif
2287         if(NoMsgInSend(msg))
2288             buffered_send_msg -= GetMempoolsize(msg);
2289         CmiFree(msg);
2290         AckPool_freeslot(slot);
2291     }
2292     if(status == GNI_RC_ERROR_RESOURCE)
2293     {
2294         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2295     }
2296 }
2297 #endif
2298
2299 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2300 {
2301     gni_cq_entry_t          ev;
2302     gni_return_t            status;
2303     uint64_t                type, inst_id;
2304     gni_post_descriptor_t   *tmp_pd;
2305     MSG_LIST                *ptr;
2306     CONTROL_MSG             *ack_msg_tmp;
2307     ACK_MSG                 *ack_msg;
2308     uint8_t                 msg_tag;
2309 #if CMK_DIRECT
2310     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2311 #endif
2312     SMSG_QUEUE         *queue = &smsg_queue;
2313
2314     while(1) {
2315         CMI_GNI_LOCK(my_cq_lock) 
2316         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2317         CMI_GNI_UNLOCK(my_cq_lock)
2318         if(status != GNI_RC_SUCCESS) break;
2319         
2320         type = GNI_CQ_GET_TYPE(ev);
2321         if (type == GNI_CQ_EVENT_TYPE_POST)
2322         {
2323             inst_id     = GNI_CQ_GET_INST_ID(ev);
2324 #if PRINT_SYH
2325             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2326 #endif
2327             CMI_GNI_LOCK(my_cq_lock)
2328             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2329             CMI_GNI_UNLOCK(my_cq_lock)
2330
2331             switch (tmp_pd->type) {
2332 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2333             case GNI_POST_RDMA_PUT:
2334 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2335                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2336 #endif
2337             case GNI_POST_FMA_PUT:
2338                 if(tmp_pd->amo_cmd == 1) {
2339                     //sender ACK to receiver to trigger it is done
2340                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2341                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2342                     msg_tag = DIRECT_PUT_DONE_TAG;
2343                 }
2344                 else {
2345                     CmiFree((void *)tmp_pd->local_addr);
2346                     MallocControlMsg(ack_msg_tmp);
2347                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2348                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2349                     msg_tag = PUT_DONE_TAG;
2350                 }
2351                 break;
2352 #endif
2353             case GNI_POST_RDMA_GET:
2354             case GNI_POST_FMA_GET:  {
2355 #if  ! USE_LRTS_MEMPOOL
2356                 MallocControlMsg(ack_msg_tmp);
2357                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2358                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2359                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2360                 msg_tag = ACK_TAG;  
2361 #else
2362                 int seq_id = tmp_pd->cqwrite_value;
2363                 if(seq_id > 0)      // BIG_MSG
2364                 {
2365                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2366                     MallocControlMsg(ack_msg_tmp);
2367                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2368                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2369                     ack_msg_tmp->seq_id = seq_id;
2370                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2371                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2372                     ack_msg_tmp->length = tmp_pd->length;
2373                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2374                     msg_tag = BIG_MSG_TAG; 
2375                 } 
2376                 else
2377                 {
2378                     msg_tag = ACK_TAG; 
2379 #if  !REMOTE_EVENT && !CQWRITE
2380                     MallocAckMsg(ack_msg);
2381                     ack_msg->source_addr = tmp_pd->remote_addr;
2382 #endif
2383                 }
2384 #endif
2385                 break;
2386             }
2387             case  GNI_POST_CQWRITE:
2388                    FreePostDesc(tmp_pd);
2389                    continue;
2390             default:
2391                 CmiPrintf("type=%d\n", tmp_pd->type);
2392                 CmiAbort("PumpLocalTransactions: unknown type!");
2393             }      /* end of switch */
2394
2395 #if CMK_DIRECT
2396             if (tmp_pd->amo_cmd == 1) {
2397                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2398                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2399             }
2400             else
2401 #endif
2402             if (msg_tag == ACK_TAG) {
2403 #if !REMOTE_EVENT
2404 #if   !CQWRITE
2405                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2406                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2407 #else
2408                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2409 #endif
2410 #endif
2411             }
2412             else {
2413                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2414                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2415             }
2416 #if CMK_PERSISTENT_COMM
2417             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2418 #endif
2419             {
2420                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2421 #if PRINT_SYH
2422                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2423 #endif
2424                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (void*)tmp_pd->local_addr); 
2425
2426                     START_EVENT();
2427                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2428                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2429 #if MACHINE_DEBUG_LOG
2430                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2431                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2432                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2433 #endif
2434                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2435                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2436                 }else if(msg_tag == BIG_MSG_TAG){
2437                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2438                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2439                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2440                         START_EVENT();
2441 #if PRINT_SYH
2442                         printf("Pipeline msg done [%d]\n", myrank);
2443 #endif
2444 #if                 CMK_SMP_TRACE_COMMTHREAD
2445                         if( tmp_pd->cqwrite_value == 1)
2446                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+2)/1000000.0), (void*)tmp_pd->local_addr); 
2447 #endif
2448                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2449                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2450                     }
2451                 }
2452             }
2453             FreePostDesc(tmp_pd);
2454         }
2455     } //end while
2456     if(status == GNI_RC_ERROR_RESOURCE)
2457     {
2458         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2459         GNI_RC_CHECK("Smsg_tx_cq full", status);
2460     }
2461 }
2462
2463 static void  SendRdmaMsg()
2464 {
2465     gni_return_t            status = GNI_RC_SUCCESS;
2466     gni_mem_handle_t        msg_mem_hndl;
2467     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2468     RDMA_REQUEST            *pre = 0;
2469     uint64_t                register_size = 0;
2470     void                    *msg;
2471     int                     i;
2472 #if CMK_SMP
2473     int len = PCQueueLength(sendRdmaBuf);
2474     for (i=0; i<len; i++)
2475     {
2476         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2477         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2478         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2479         if (ptr == NULL) break;
2480 #else
2481     ptr = sendRdmaBuf;
2482     while (ptr!=0)
2483     {
2484 #endif 
2485         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2486         gni_post_descriptor_t *pd = ptr->pd;
2487         status = GNI_RC_SUCCESS;
2488         
2489         if(pd->cqwrite_value == 0)
2490         {
2491             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2492             {
2493                 msg = (void*)(pd->local_addr);
2494                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2495                 if(status == GNI_RC_SUCCESS)
2496                 {
2497                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2498                 }
2499             }else
2500             {
2501                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2502             }
2503             if(NoMsgInRecv( (void*)(pd->local_addr)))
2504                 register_size = GetMempoolsize((void*)(pd->local_addr));
2505             else
2506                 register_size = 0;
2507         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2508         {
2509             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
2510         }
2511         if(status == GNI_RC_SUCCESS)        //mem register good
2512         {
2513             CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET? rdma_tx_cq_lock:default_tx_cq_lock;
2514             CMI_GNI_LOCK(lock);
2515 #if REMOTE_EVENT
2516             if( pd->cqwrite_value == 0)
2517             {
2518                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2519                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2520                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2521             }
2522             else {
2523                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
2524                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2525             }
2526 #endif
2527             if(pd->type == GNI_POST_RDMA_GET) 
2528             {
2529                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2530             }
2531             else
2532             {
2533                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2534             }
2535             CMI_GNI_UNLOCK(lock);
2536             
2537             if(status == GNI_RC_SUCCESS)    //post good
2538             {
2539 #if !CMK_SMP
2540                 tmp_ptr = ptr;
2541                 if(pre != 0) {
2542                     pre->next = ptr->next;
2543                 }
2544                 else {
2545                     sendRdmaBuf = ptr->next;
2546                 }
2547                 ptr = ptr->next;
2548                 FreeRdmaRequest(tmp_ptr);
2549 #endif
2550                 if(pd->cqwrite_value == 0)
2551                 {
2552                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2553                 }
2554 #if MACHINE_DEBUG_LOG
2555                 buffered_recv_msg += register_size;
2556                 MACHSTATE(8, "GO request from buffered\n"); 
2557 #endif
2558             }else           // cannot post
2559             {
2560 #if CMK_SMP
2561                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2562 #else
2563                 pre = ptr;
2564                 ptr = ptr->next;
2565 #endif
2566                 break;
2567             }
2568         } else          //memory registration fails
2569         {
2570 #if CMK_SMP
2571             PCQueuePush(sendRdmaBuf, (char*)ptr);
2572 #else
2573             pre = ptr;
2574             ptr = ptr->next;
2575 #endif
2576         }
2577     } //end while
2578 #if ! CMK_SMP
2579     if(ptr == 0)
2580         sendRdmaTail = pre;
2581 #endif
2582 }
2583
2584 // return 1 if all messages are sent
2585 static int SendBufferMsg(SMSG_QUEUE *queue)
2586 {
2587     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2588     CONTROL_MSG         *control_msg_tmp;
2589     gni_return_t        status;
2590     int                 done = 1;
2591     uint64_t            register_size;
2592     void                *register_addr;
2593     int                 index_previous = -1;
2594 #if CMI_EXERT_SEND_CAP
2595     int                 sent_cnt = 0;
2596 #endif
2597
2598 #if CMK_SMP
2599     int          index = 0;
2600 #if ONE_SEND_QUEUE
2601     memset(destpe_avail, 0, mysize * sizeof(char));
2602     for (index=0; index<1; index++)
2603     {
2604         int i, len = PCQueueLength(queue->sendMsgBuf);
2605         for (i=0; i<len; i++) 
2606         {
2607             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2608             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2609             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2610             if(ptr == NULL) break;
2611             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2612                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2613                 continue;
2614             }
2615 #else
2616 #if SMP_LOCKS
2617     int nonempty = PCQueueLength(nonEmptyQueues);
2618     for(index =0; index<nonempty; index++)
2619     {
2620         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2621         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2622         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2623         if(current_list == NULL) break; 
2624         PCQueue current_queue= current_list-> sendSmsgBuf;
2625         CmiLock(current_list->lock);
2626         int i, len = PCQueueLength(current_queue);
2627         current_list->pushed = 0;
2628         CmiUnlock(current_list->lock);
2629 #else
2630     for(index =0; index<mysize; index++)
2631     {
2632         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2633         int i, len = PCQueueLength(current_queue);
2634 #endif
2635         for (i=0; i<len; i++) 
2636         {
2637             CMI_PCQUEUEPOP_LOCK(current_queue)
2638             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2639             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2640             if (ptr == 0) break;
2641 #endif
2642 #else
2643     int index = queue->smsg_head_index;
2644     while(index != -1)
2645     {
2646         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2647         pre = 0;
2648         while(ptr != 0)
2649         {
2650 #endif
2651             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2652             status = GNI_RC_ERROR_RESOURCE;
2653             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2654                 /* connection not exists yet */
2655             }
2656             else
2657             switch(ptr->tag)
2658             {
2659             case SMALL_DATA_TAG:
2660                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2661                 if(status == GNI_RC_SUCCESS)
2662                 {
2663                     CmiFree(ptr->msg);
2664                 }
2665                 break;
2666             case LMSG_INIT_TAG:
2667                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2668                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2669                 break;
2670             case ACK_TAG:
2671                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2672                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2673                 break;
2674             case BIG_MSG_TAG:
2675                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2676                 if(status == GNI_RC_SUCCESS)
2677                 {
2678                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2679                 }
2680                 break;
2681 #if CMK_DIRECT
2682             case DIRECT_PUT_DONE_TAG:
2683                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2684                 if(status == GNI_RC_SUCCESS)
2685                 {
2686                     free((CMK_DIRECT_HEADER*)ptr->msg);
2687                 }
2688                 break;
2689
2690 #endif
2691             default:
2692                 printf("Weird tag\n");
2693                 CmiAbort("should not happen\n");
2694             }       // end switch
2695             if(status == GNI_RC_SUCCESS)
2696             {
2697 #if PRINT_SYH
2698                 buffered_smsg_counter--;
2699                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2700 #endif
2701 #if !CMK_SMP
2702                 tmp_ptr = ptr;
2703                 if(pre)
2704                 {
2705                     ptr = pre ->next = ptr->next;
2706                 }else
2707                 {
2708                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2709                 }
2710                 FreeMsgList(tmp_ptr);
2711 #else
2712                 FreeMsgList(ptr);
2713 #endif
2714 #if CMI_EXERT_SEND_CAP
2715                 sent_cnt++;
2716                 if(sent_cnt == SEND_CAP)
2717                     break;
2718 #endif
2719             }else {
2720 #if CMK_SMP
2721 #if ONE_SEND_QUEUE
2722                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2723 #else
2724                 PCQueuePush(current_queue, (char*)ptr);
2725 #endif
2726 #else
2727                 pre = ptr;
2728                 ptr=ptr->next;
2729 #endif
2730                 done = 0;
2731                 if(status == GNI_RC_ERROR_RESOURCE)
2732                 {
2733 #if CMK_SMP && ONE_SEND_QUEUE 
2734                     destpe_avail[ptr->destNode] = 1;
2735 #else
2736                     break;
2737 #endif
2738                 }
2739             } 
2740         } //end while
2741 #if !CMK_SMP
2742         if(ptr == 0)
2743             queue->smsg_msglist_index[index].tail = pre;
2744         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2745         {
2746             if(index_previous != -1)
2747                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2748             else
2749                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2750         }
2751         else {
2752             index_previous = index;
2753         }
2754         index = queue->smsg_msglist_index[index].next;
2755 #else
2756 #if !ONE_SEND_QUEUE && SMP_LOCKS
2757         CmiLock(current_list->lock);
2758         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2759         {
2760             current_list->pushed = 1;
2761             PCQueuePush(nonEmptyQueues, current_list);
2762         }
2763         CmiUnlock(current_list->lock); 
2764 #endif
2765 #endif
2766
2767 #if CMI_EXERT_SEND_CAP
2768         if(sent_cnt == SEND_CAP)
2769                 break;
2770 #endif
2771     }   // end pooling for all cores
2772     return done;
2773 }
2774
2775 static void ProcessDeadlock();
2776 void LrtsAdvanceCommunication(int whileidle)
2777 {
2778     static int count = 0;
2779     /*  Receive Msg first */
2780 #if CMK_SMP_TRACE_COMMTHREAD
2781     double startT, endT;
2782 #endif
2783     if (useDynamicSMSG && whileidle)
2784     {
2785 #if CMK_SMP_TRACE_COMMTHREAD
2786         startT = CmiWallTimer();
2787 #endif
2788         PumpDatagramConnection();
2789 #if CMK_SMP_TRACE_COMMTHREAD
2790         endT = CmiWallTimer();
2791         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2792 #endif
2793     }
2794
2795 #if CMK_SMP_TRACE_COMMTHREAD
2796     startT = CmiWallTimer();
2797 #endif
2798     PumpNetworkSmsg();
2799     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2800 #if CMK_SMP_TRACE_COMMTHREAD
2801     endT = CmiWallTimer();
2802     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2803 #endif
2804
2805 #if CMK_SMP_TRACE_COMMTHREAD
2806     startT = CmiWallTimer();
2807 #endif
2808     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2809     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2810 #if CMK_SMP_TRACE_COMMTHREAD
2811     endT = CmiWallTimer();
2812     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2813 #endif
2814
2815 #if CMK_SMP_TRACE_COMMTHREAD
2816     startT = CmiWallTimer();
2817 #endif
2818     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
2819
2820 #if CQWRITE
2821     PumpCqWriteTransactions();
2822 #endif
2823
2824 #if REMOTE_EVENT
2825     PumpRemoteTransactions();
2826 #endif
2827
2828     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2829 #if CMK_SMP_TRACE_COMMTHREAD
2830     endT = CmiWallTimer();
2831     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
2832 #endif
2833  
2834     /* Send buffered Message */
2835 #if CMK_SMP_TRACE_COMMTHREAD
2836     startT = CmiWallTimer();
2837 #endif
2838 #if CMK_USE_OOB
2839     if (SendBufferMsg(&smsg_oob_queue) == 1)
2840 #endif
2841     {
2842     SendBufferMsg(&smsg_queue);
2843     }
2844     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2845 #if CMK_SMP_TRACE_COMMTHREAD
2846     endT = CmiWallTimer();
2847     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
2848 #endif
2849
2850 #if CMK_SMP_TRACE_COMMTHREAD
2851     startT = CmiWallTimer();
2852 #endif
2853     SendRdmaMsg();
2854     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2855 #if CMK_SMP_TRACE_COMMTHREAD
2856     endT = CmiWallTimer();
2857     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
2858 #endif
2859
2860 #if CMK_SMP && ! LARGEPAGE
2861     if (_detected_hang)  ProcessDeadlock();
2862 #endif
2863 }
2864
2865 /* useDynamicSMSG */
2866 static void _init_dynamic_smsg()
2867 {
2868     gni_return_t status;
2869     uint32_t     vmdh_index = -1;
2870     int i;
2871
2872     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2873     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2874     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2875     for(i=0; i<mysize; i++) {
2876         smsg_connected_flag[i] = 0;
2877         smsg_attr_vector_local[i] = NULL;
2878         smsg_attr_vector_remote[i] = NULL;
2879     }
2880     if(mysize <=512)
2881     {
2882         SMSG_MAX_MSG = 4096;
2883     }else if (mysize <= 4096)
2884     {
2885         SMSG_MAX_MSG = 4096/mysize * 1024;
2886     }else if (mysize <= 16384)
2887     {
2888         SMSG_MAX_MSG = 512;
2889     }else {
2890         SMSG_MAX_MSG = 256;
2891     }
2892
2893     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2894     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2895     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2896     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2897     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2898
2899     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2900     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2901     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2902     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2903     mailbox_list->offset = 0;
2904     mailbox_list->next = 0;
2905     
2906     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2907         mailbox_list->size, smsg_rx_cqh,
2908         GNI_MEM_READWRITE,   
2909         vmdh_index,
2910         &(mailbox_list->mem_hndl));
2911     GNI_RC_CHECK("MEMORY registration for smsg", status);
2912
2913     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
2914     GNI_RC_CHECK("Unbound EP", status);
2915     
2916     alloc_smsg_attr(&send_smsg_attr);
2917
2918     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2919     GNI_RC_CHECK("post unbound datagram", status);
2920
2921       /* always pre-connect to proc 0 */
2922     //if (myrank != 0) connect_to(0);
2923 }
2924
2925 static void _init_static_smsg()
2926 {
2927     gni_smsg_attr_t      *smsg_attr;
2928     gni_smsg_attr_t      remote_smsg_attr;
2929     gni_smsg_attr_t      *smsg_attr_vec;
2930     gni_mem_handle_t     my_smsg_mdh_mailbox;
2931     int      ret, i;
2932     gni_return_t status;
2933     uint32_t              vmdh_index = -1;
2934     mdh_addr_t            base_infor;
2935     mdh_addr_t            *base_addr_vec;
2936     char *env;
2937
2938     if(mysize <=512)
2939     {
2940         SMSG_MAX_MSG = 1024;
2941     }else if (mysize <= 4096)
2942     {
2943         SMSG_MAX_MSG = 1024;
2944     }else if (mysize <= 16384)
2945     {
2946         SMSG_MAX_MSG = 512;
2947     }else {
2948         SMSG_MAX_MSG = 256;
2949     }
2950     
2951     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2952     if (env) SMSG_MAX_MSG = atoi(env);
2953     CmiAssert(SMSG_MAX_MSG > 0);
2954
2955     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2956     
2957     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2958     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2959     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2960     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2961     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2962     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2963     CmiAssert(ret == 0);
2964     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2965     
2966     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2967             smsg_memlen*(mysize), smsg_rx_cqh,
2968             GNI_MEM_READWRITE,   
2969             vmdh_index,
2970             &my_smsg_mdh_mailbox);
2971     register_memory_size += smsg_memlen*(mysize);
2972     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2973
2974     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2975     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2976
2977     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2978     base_infor.mdh =  my_smsg_mdh_mailbox;
2979     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2980
2981     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2982  
2983     for(i=0; i<mysize; i++)
2984     {
2985         if(i==myrank)
2986             continue;
2987         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2988         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2989         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2990         smsg_attr[i].mbox_offset = i*smsg_memlen;
2991         smsg_attr[i].buff_size = smsg_memlen;
2992         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2993         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2994     }
2995
2996     for(i=0; i<mysize; i++)
2997     {
2998         if (myrank == i) continue;
2999
3000         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3001         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3002         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3003         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3004         remote_smsg_attr.buff_size = smsg_memlen;
3005         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3006         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3007
3008         /* initialize the smsg channel */
3009         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3010         GNI_RC_CHECK("SMSG Init", status);
3011     } //end initialization
3012
3013     free(base_addr_vec);
3014     free(smsg_attr);
3015
3016     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3017     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3018
3019
3020 inline
3021 static void _init_send_queue(SMSG_QUEUE *queue)
3022 {
3023      int i;
3024 #if ONE_SEND_QUEUE
3025      queue->sendMsgBuf = PCQueueCreate();
3026      destpe_avail = (char*)malloc(mysize * sizeof(char));
3027 #else
3028      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3029 #if CMK_SMP && SMP_LOCKS
3030      nonEmptyQueues = PCQueueCreate();
3031 #endif
3032      for(i =0; i<mysize; i++)
3033      {
3034 #if CMK_SMP
3035          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3036 #if SMP_LOCKS
3037          queue->smsg_msglist_index[i].pushed = 0;
3038          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3039 #endif
3040 #else
3041          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3042          queue->smsg_msglist_index[i].next = -1;
3043          queue->smsg_head_index = -1;
3044 #endif
3045         
3046      }
3047 #endif
3048 }
3049
3050 inline
3051 static void _init_smsg()
3052 {
3053     if(mysize > 1) {
3054         if (useDynamicSMSG)
3055             _init_dynamic_smsg();
3056         else
3057             _init_static_smsg();
3058     }
3059
3060     _init_send_queue(&smsg_queue);
3061 #if CMK_USE_OOB
3062     _init_send_queue(&smsg_oob_queue);
3063 #endif
3064 }
3065
3066 static void _init_static_msgq()
3067 {
3068     gni_return_t status;
3069     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3070     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3071     msgq_attrs.smsg_q_sz = 1;
3072     msgq_attrs.rcv_pool_sz = 1;
3073     msgq_attrs.num_msgq_eps = 2;
3074     msgq_attrs.nloc_insts = 8;
3075     msgq_attrs.modes = 0;
3076     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3077
3078     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3079     GNI_RC_CHECK("MSGQ Init", status);
3080
3081
3082 }
3083
3084
3085 static CmiUInt8 total_mempool_size = 0;
3086 static CmiUInt8 total_mempool_calls = 0;
3087
3088 #if USE_LRTS_MEMPOOL
3089 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3090 {
3091     void *pool;
3092     int ret;
3093     gni_return_t status = GNI_RC_SUCCESS;
3094
3095     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3096     if (*size < default_size) *size = default_size;
3097 #if LARGEPAGE
3098     // round up to be multiple of _tlbpagesize
3099     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3100     *size = ALIGNHUGEPAGE(*size);
3101 #endif
3102     total_mempool_size += *size;
3103     total_mempool_calls += 1;
3104 #if   !LARGEPAGE
3105     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3106     {
3107         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3108         CmiAbort("alloc_mempool_block");
3109     }
3110 #endif
3111 #if LARGEPAGE
3112     pool = my_get_huge_pages(*size);
3113     ret = pool==NULL;
3114 #else
3115     ret = posix_memalign(&pool, ALIGNBUF, *size);
3116 #endif
3117     if (ret != 0) {
3118 #if CMK_SMP && STEAL_MEMPOOL
3119       pool = steal_mempool_block(size, mem_hndl);
3120       if (pool != NULL) return pool;
3121 #endif
3122       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3123       if (ret == ENOMEM)
3124         CmiAbort("alloc_mempool_block: out of memory.");
3125       else
3126         CmiAbort("alloc_mempool_block: posix_memalign failed");
3127     }
3128 #if LARGEPAGE
3129     CmiMemLock();
3130     register_count++;
3131     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3132     CmiMemUnlock();
3133     if(status != GNI_RC_SUCCESS) {
3134         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3135 sweep_mempool(CpvAccess(mempool));
3136     }
3137     GNI_RC_CHECK("MEMORY_REGISTER", status);
3138 #else
3139     SetMemHndlZero((*mem_hndl));
3140 #endif
3141     return pool;
3142 }
3143
3144 // ptr is a block head pointer
3145 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3146 {
3147     if(!(IsMemHndlZero(mem_hndl)))
3148     {
3149         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3150     }
3151 #if LARGEPAGE
3152     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3153 #else
3154     free(ptr);
3155 #endif
3156 }
3157 #endif
3158
3159 void LrtsPreCommonInit(int everReturn){
3160 #if USE_LRTS_MEMPOOL
3161     CpvInitialize(mempool_type*, mempool);
3162     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3163     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3164 #endif
3165 }
3166
3167 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3168 {
3169     register int            i;
3170     int                     rc;
3171     int                     device_id = 0;
3172     unsigned int            remote_addr;
3173     gni_cdm_handle_t        cdm_hndl;
3174     gni_return_t            status = GNI_RC_SUCCESS;
3175     uint32_t                vmdh_index = -1;
3176     uint8_t                 ptag;
3177     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3178     int                     first_spawned;
3179     int                     physicalID;
3180     char                   *env;
3181
3182     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3183     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3184     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3185    
3186     status = PMI_Init(&first_spawned);
3187     GNI_RC_CHECK("PMI_Init", status);
3188
3189     status = PMI_Get_size(&mysize);
3190     GNI_RC_CHECK("PMI_Getsize", status);
3191
3192     status = PMI_Get_rank(&myrank);
3193     GNI_RC_CHECK("PMI_getrank", status);
3194
3195     //physicalID = CmiPhysicalNodeID(myrank);
3196     
3197     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3198
3199     *myNodeID = myrank;
3200     *numNodes = mysize;
3201   
3202 #if MULTI_THREAD_SEND
3203     /* Currently, we only consider the case that comm. thread will only recv msgs */
3204     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3205 #endif
3206
3207     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3208     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3209     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3210
3211     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3212     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3213     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3214
3215     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3216     if (env) useDynamicSMSG = 1;
3217     if (!useDynamicSMSG)
3218       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3219     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3220     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3221     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3222     
3223     if(myrank == 0)
3224     {
3225         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3226         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3227     }
3228 #ifdef USE_ONESIDED
3229     onesided_init(NULL, &onesided_hnd);
3230
3231     // this is a GNI test, so use the libonesided bypass functionality
3232     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3233     local_addr = gniGetNicAddress();
3234 #else
3235     ptag = get_ptag();
3236     cookie = get_cookie();
3237 #if 0
3238     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3239 #endif
3240     //Create and attach to the communication  domain */
3241     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3242     GNI_RC_CHECK("GNI_CdmCreate", status);
3243     //* device id The device id is the minor number for the device
3244     //that is assigned to the device by the system when the device is created.
3245     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3246     //where X is the device number 0 default 
3247     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3248     GNI_RC_CHECK("GNI_CdmAttach", status);
3249     local_addr = get_gni_nic_address(0);
3250 #endif
3251     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3252     _MEMCHECK(MPID_UGNI_AllAddr);
3253     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3254     /* create the local completion queue */
3255     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3256     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3257     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3258     
3259     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3260     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3261     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3262
3263     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3264     GNI_RC_CHECK("Create CQ (rx)", status);
3265     
3266     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3267     GNI_RC_CHECK("Create Post CQ (rx)", status);
3268     
3269     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3270     //GNI_RC_CHECK("Create BTE CQ", status);
3271
3272     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3273     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3274     _MEMCHECK(ep_hndl_array);
3275 #if MULTI_THREAD_SEND 
3276     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3277     //default_tx_cq_lock = CmiCreateLock();
3278     rdma_tx_cq_lock = CmiCreateLock();
3279     smsg_rx_cq_lock = CmiCreateLock();
3280     //global_gni_lock  = CmiCreateLock();
3281     //rx_cq_lock  = CmiCreateLock();
3282 #endif
3283     for (i=0; i<mysize; i++) {
3284         if(i == myrank) continue;
3285         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3286         GNI_RC_CHECK("GNI_EpCreate ", status);   
3287         remote_addr = MPID_UGNI_AllAddr[i];
3288         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3289         GNI_RC_CHECK("GNI_EpBind ", status);   
3290     }
3291
3292     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3293     _init_smsg();
3294     PMI_Barrier();
3295
3296 #if     USE_LRTS_MEMPOOL
3297     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3298     if (env) {
3299         _totalmem = CmiReadSize(env);
3300         if (myrank == 0)
3301             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3302     }
3303
3304     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3305     if (env) _mempool_size = CmiReadSize(env);
3306     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3307         _mempool_size = CmiReadSize(env);
3308
3309
3310     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3311     if (env) {
3312         MAX_REG_MEM = CmiReadSize(env);
3313         user_set_flag = 1;
3314     }
3315     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3316         MAX_REG_MEM = CmiReadSize(env);
3317         user_set_flag = 1;
3318     }
3319
3320     env = getenv("CHARM_UGNI_SEND_MAX");
3321     if (env) {
3322         MAX_BUFF_SEND = CmiReadSize(env);
3323         user_set_flag = 1;
3324     }
3325     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3326         MAX_BUFF_SEND = CmiReadSize(env);
3327         user_set_flag = 1;
3328     }
3329
3330     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3331     if (env) {
3332         _mempool_size_limit = CmiReadSize(env);
3333     }
3334
3335     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3336     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3337
3338     if (myrank==0) {
3339         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3340         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3341         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3342             /* memblock can expand to BIG_MSG * 2 size */
3343             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3344             CmiAbort("mempool maximum size is too small. \n");
3345         }
3346 #if MULTI_THREAD_SEND
3347         printf("Charm++> worker thread sending messages\n");
3348 #elif COMM_THREAD_SEND
3349         printf("Charm++> only comm thread send/recv messages\n");
3350 #endif
3351     }
3352
3353 #endif     /* end of USE_LRTS_MEMPOOL */
3354
3355     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3356     if (env) {
3357         BIG_MSG = CmiReadSize(env);
3358         if (BIG_MSG < ONE_SEG)
3359           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3360     }
3361     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3362     if (env) {
3363         BIG_MSG_PIPELINE = atoi(env);
3364     }
3365
3366     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3367     if (env) _checkProgress = 0;
3368     if (mysize == 1) _checkProgress = 0;
3369
3370     
3371     /*
3372     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3373     if (env) 
3374         _tlbpagesize = CmiReadSize(env);
3375     */
3376     /* real gethugepagesize() is only available when hugetlb module linked */
3377     _tlbpagesize = gethugepagesize();
3378     if (myrank == 0) {
3379         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3380     }
3381
3382 #if LARGEPAGE
3383     if (_tlbpagesize == 4096) {
3384         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3385     }
3386 #endif
3387
3388     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3389
3390     /* init DMA buffer for medium message */
3391
3392     //_init_DMA_buffer();
3393     
3394     free(MPID_UGNI_AllAddr);
3395 #if CMK_SMP
3396     sendRdmaBuf = PCQueueCreate();
3397 #else
3398     sendRdmaBuf = 0;
3399 #endif
3400 #if MACHINE_DEBUG_LOG
3401     char ln[200];
3402     sprintf(ln,"debugLog.%d",myrank);
3403     debugLog=fopen(ln,"w");
3404 #endif
3405
3406 //    NTK_Init();
3407 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3408
3409 #if  REMOTE_EVENT
3410     AckPool_init();
3411 #endif
3412
3413 #if CMK_WITH_STATS
3414     init_comm_stats();
3415 #endif
3416 }
3417
3418 void* LrtsAlloc(int n_bytes, int header)
3419 {
3420     void *ptr = NULL;
3421 #if 0
3422     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3423 #endif
3424     if(n_bytes <= SMSG_MAX_MSG)
3425     {
3426         int totalsize = n_bytes+header;
3427         ptr = malloc(totalsize);
3428     }
3429     else {
3430         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3431 #if     USE_LRTS_MEMPOOL
3432         n_bytes = ALIGN64(n_bytes);
3433         if(n_bytes < BIG_MSG)
3434         {
3435             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3436             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3437         }else 
3438         {
3439 #if LARGEPAGE
3440             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3441             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3442             char *res = my_get_huge_pages(n_bytes);
3443 #else
3444             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3445 #endif
3446             if (res) ptr = res + ALIGNBUF - header;
3447         }
3448 #else
3449         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3450         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3451         ptr = res + ALIGNBUF - header;
3452 #endif
3453     }
3454     return ptr;
3455 }
3456
3457 void  LrtsFree(void *msg)
3458 {
3459     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3460     if (size <= SMSG_MAX_MSG)
3461         free(msg);
3462     else {
3463         size = ALIGN64(size);
3464         if(size>=BIG_MSG)
3465         {
3466 #if LARGEPAGE
3467             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3468             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3469 #else
3470             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3471 #endif
3472         }
3473         else {
3474 #if    USE_LRTS_MEMPOOL
3475 #if CMK_SMP
3476             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3477 #else
3478             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3479 #endif
3480 #else
3481             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3482 #endif
3483         }
3484     }
3485 }
3486
3487 void LrtsExit()
3488 {
3489 #if CMK_WITH_STATS
3490     if (print_stats) print_comm_stats();
3491 #endif