ac302bbcca48825b5cbaa36b3c008775f4a408d7
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           1
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     0
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_CAP      0
65 #define CMI_EXERT_RECV_CAP      0
66 #define CMI_EXERT_RDMA_CAP      0
67
68 #if CMI_EXERT_SEND_CAP
69 #define SEND_CAP  32
70 #endif
71
72 #if CMI_EXERT_RECV_CAP
73 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
74 #endif
75
76 #if CMI_EXERT_RDMA_CAP
77 int   RDMA_cap =   100;
78 int   RDMA_pending = 0;
79 #endif
80
81 #define USE_LRTS_MEMPOOL                  1
82
83 #define PRINT_SYH                         0
84
85 // Trace communication thread
86 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
87 #define TRACE_THRESHOLD     0.00005
88 #define CMI_MPI_TRACE_MOREDETAILED 0
89 #undef CMI_MPI_TRACE_USEREVENTS
90 #define CMI_MPI_TRACE_USEREVENTS 1
91 #else
92 #undef CMK_SMP_TRACE_COMMTHREAD
93 #define CMK_SMP_TRACE_COMMTHREAD 0
94 #endif
95
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
98 #undef CMI_MPI_TRACE_USEREVENTS
99 #define CMI_MPI_TRACE_USEREVENTS 1
100 #else
101 #undef CMK_TRACE_COMMOVERHEAD
102 #define CMK_TRACE_COMMOVERHEAD 0
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 #define  START_EVENT()
111 #define  END_EVENT(x)
112 #endif
113
114 #if USE_LRTS_MEMPOOL
115
116 #define oneMB (1024ll*1024)
117 #define oneGB (1024ll*1024*1024)
118
119 static CmiInt8 _mempool_size = 8*oneMB;
120 static CmiInt8 _expand_mem =  4*oneMB;
121 static CmiInt8 _mempool_size_limit = 0;
122
123 static CmiInt8 _totalmem = 0.8*oneGB;
124
125 #if LARGEPAGE
126 static CmiInt8 BIG_MSG  =  16*oneMB;
127 static CmiInt8 ONE_SEG  =  4*oneMB;
128 #else
129 static CmiInt8 BIG_MSG  =  4*oneMB;
130 static CmiInt8 ONE_SEG  =  2*oneMB;
131 #endif
132 #if MULTI_THREAD_SEND
133 static int BIG_MSG_PIPELINE = 1;
134 #else
135 static int BIG_MSG_PIPELINE = 4;
136 #endif
137
138 // dynamic flow control
139 static CmiInt8 buffered_send_msg = 0;
140 static CmiInt8 register_memory_size = 0;
141
142 #if LARGEPAGE
143 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
144 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
145 static CmiInt8  register_count = 0;
146 #else
147 #if CMK_SMP && COMM_THREAD_SEND 
148 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
149 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
150 #else
151 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
152 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
153 #endif
154
155
156 #endif
157
158 #endif     /* end USE_LRTS_MEMPOOL */
159
160 #if MULTI_THREAD_SEND 
161 #define     CMI_GNI_LOCK(x)       CmiLock(x);
162 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
163 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
164 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
165 #else
166 #define     CMI_GNI_LOCK(x)
167 #define     CMI_GNI_UNLOCK(x)
168 #define     CMI_PCQUEUEPOP_LOCK(Q)   
169 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
170 #endif
171
172 static int _tlbpagesize = 4096;
173
174 //static int _smpd_count  = 0;
175
176 static int   user_set_flag  = 0;
177
178 static int _checkProgress = 1;             /* check deadlock */
179 static int _detected_hang = 0;
180
181 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
182
183 // dynamic SMSG
184 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
185
186 static int avg_smsg_connection = 32;
187 static int                 *smsg_connected_flag= 0;
188 static gni_smsg_attr_t     **smsg_attr_vector_local;
189 static gni_smsg_attr_t     **smsg_attr_vector_remote;
190 static gni_ep_handle_t     ep_hndl_unbound;
191 static gni_smsg_attr_t     send_smsg_attr;
192 static gni_smsg_attr_t     recv_smsg_attr;
193
194 typedef struct _dynamic_smsg_mailbox{
195    void     *mailbox_base;
196    int      size;
197    int      offset;
198    gni_mem_handle_t  mem_hndl;
199    struct      _dynamic_smsg_mailbox  *next;
200 }dynamic_smsg_mailbox_t;
201
202 static dynamic_smsg_mailbox_t  *mailbox_list;
203
204 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
205 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
206
207 #if PRINT_SYH
208 int         lrts_send_msg_id = 0;
209 int         lrts_local_done_msg = 0;
210 int         lrts_send_rdma_success = 0;
211 #endif
212
213 #include "machine.h"
214
215 #include "pcqueue.h"
216
217 #include "mempool.h"
218
219 #if CMK_PERSISTENT_COMM
220 #include "machine-persistent.h"
221 #endif
222
223 //#define  USE_ONESIDED 1
224 #ifdef USE_ONESIDED
225 //onesided implementation is wrong, since no place to restore omdh
226 #include "onesided.h"
227 onesided_hnd_t   onesided_hnd;
228 onesided_md_t    omdh;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
230
231 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
232
233 #else
234 uint8_t   onesided_hnd, omdh;
235
236 #if REMOTE_EVENT || CQWRITE 
237 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
238     if(register_memory_size+size>= MAX_REG_MEM) { \
239         status = GNI_RC_ERROR_NOMEM;} \
240     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
241         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
242 #else
243 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
244         if (register_memory_size + size >= MAX_REG_MEM) { \
245             status = GNI_RC_ERROR_NOMEM; \
246         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
247             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
248 #endif
249
250 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
251     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
252              register_memory_size -= size; \
253          else CmiAbort("MEM_DEregister");  \
254     } while (0)
255 #endif
256
257 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
258 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
259 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
260 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
261 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
262 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
263 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
264 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
265 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
266 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
267 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
268 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
269 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
270 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
271
272 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
273 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
274
275 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
276 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
277 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
278 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
279
280 #define ALIGNBUF                64
281
282 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
283 /* If SMSG is not used */
284
285 #define FMA_PER_CORE  1024
286 #define FMA_BUFFER_SIZE 1024
287
288 /* If SMSG is used */
289 static int  SMSG_MAX_MSG = 1024;
290 #define SMSG_MAX_CREDIT    72
291
292 #define MSGQ_MAXSIZE       2048
293
294 /* large message transfer with FMA or BTE */
295 #if ! REMOTE_EVENT
296 #define LRTS_GNI_RDMA_THRESHOLD  1024 
297 #else
298    /* remote events only work with RDMA */
299 #define LRTS_GNI_RDMA_THRESHOLD  0 
300 #endif
301
302 #if CMK_SMP
303 static int  REMOTE_QUEUE_ENTRIES=163840; 
304 static int LOCAL_QUEUE_ENTRIES=163840; 
305 #else
306 static int  REMOTE_QUEUE_ENTRIES=20480;
307 static int LOCAL_QUEUE_ENTRIES=20480; 
308 #endif
309
310 #define BIG_MSG_TAG             0x26
311 #define PUT_DONE_TAG            0x28
312 #define DIRECT_PUT_DONE_TAG     0x29
313 #define ACK_TAG                 0x30
314 /* SMSG is data message */
315 #define SMALL_DATA_TAG          0x31
316 /* SMSG is a control message to initialize a BTE */
317 #define LMSG_INIT_TAG           0x39 
318
319 #define DEBUG
320 #ifdef GNI_RC_CHECK
321 #undef GNI_RC_CHECK
322 #endif
323 #ifdef DEBUG
324 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
325 #else
326 #define GNI_RC_CHECK(msg,rc)
327 #endif
328
329 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
330 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
331 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
332
333 static int useStaticMSGQ = 0;
334 static int useStaticFMA = 0;
335 static int mysize, myrank;
336 static gni_nic_handle_t   nic_hndl;
337
338 typedef struct {
339     gni_mem_handle_t mdh;
340     uint64_t addr;
341 } mdh_addr_t ;
342 // this is related to dynamic SMSG
343
344 typedef struct mdh_addr_list{
345     gni_mem_handle_t mdh;
346    void *addr;
347     struct mdh_addr_list *next;
348 }mdh_addr_list_t;
349
350 static unsigned int         smsg_memlen;
351 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
352 mdh_addr_t          setup_mem;
353 mdh_addr_t          *smsg_connection_vec = 0;
354 gni_mem_handle_t    smsg_connection_memhndl;
355 static int          smsg_expand_slots = 10;
356 static int          smsg_available_slot = 0;
357 static void         *smsg_mailbox_mempool = 0;
358 mdh_addr_list_t     *smsg_dynamic_list = 0;
359
360 static void             *smsg_mailbox_base;
361 gni_msgq_attr_t         msgq_attrs;
362 gni_msgq_handle_t       msgq_handle;
363 gni_msgq_ep_attr_t      msgq_ep_attrs;
364 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
365
366 /* =====Beginning of Declarations of Machine Specific Variables===== */
367 static int cookie;
368 static int modes = 0;
369 static gni_cq_handle_t       smsg_rx_cqh = NULL;
370 static gni_cq_handle_t       default_tx_cqh = NULL;
371 static gni_cq_handle_t       rdma_tx_cqh = NULL;
372 static gni_cq_handle_t       rdma_rx_cqh = NULL;
373 static gni_cq_handle_t       post_tx_cqh = NULL;
374 static gni_ep_handle_t       *ep_hndl_array;
375
376 static CmiNodeLock           *ep_lock_array;
377 static CmiNodeLock           default_tx_cq_lock; 
378 static CmiNodeLock           rdma_tx_cq_lock; 
379 static CmiNodeLock           global_gni_lock; 
380 static CmiNodeLock           rx_cq_lock;
381 static CmiNodeLock           smsg_mailbox_lock;
382 static CmiNodeLock           smsg_rx_cq_lock;
383 static CmiNodeLock           *mempool_lock;
384 //#define     CMK_WITH_STATS      1
385 typedef struct msg_list
386 {
387     uint32_t destNode;
388     uint32_t size;
389     void *msg;
390     uint8_t tag;
391 #if !CMK_SMP
392     struct msg_list *next;
393 #endif
394 #if CMK_WITH_STATS
395     double  creation_time;
396 #endif
397 }MSG_LIST;
398
399
400 typedef struct control_msg
401 {
402     uint64_t            source_addr;    /* address from the start of buffer  */
403     uint64_t            dest_addr;      /* address from the start of buffer */
404     int                 total_length;   /* total length */
405     int                 length;         /* length of this packet */
406 #if REMOTE_EVENT
407     int                 ack_index;      /* index from integer to address */
408 #endif
409     uint8_t             seq_id;         //big message   0 meaning single message
410     gni_mem_handle_t    source_mem_hndl;
411     struct control_msg *next;
412 } CONTROL_MSG;
413
414 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
415
416 typedef struct ack_msg
417 {
418     uint64_t            source_addr;    /* address from the start of buffer  */
419 #if ! USE_LRTS_MEMPOOL
420     gni_mem_handle_t    source_mem_hndl;
421     int                 length;          /* total length */
422 #endif
423     struct ack_msg     *next;
424 } ACK_MSG;
425
426 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
427
428 #if CMK_DIRECT
429 typedef struct{
430     uint64_t    handler_addr;
431 }CMK_DIRECT_HEADER;
432
433 typedef struct {
434     char core[CmiMsgHeaderSizeBytes];
435     uint64_t handler;
436 }cmidirectMsg;
437
438 //SYH
439 CpvDeclare(int, CmiHandleDirectIdx);
440 void CmiHandleDirectMsg(cmidirectMsg* msg)
441 {
442
443     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
444    (*(_handle->callbackFnPtr))(_handle->callbackData);
445    CmiFree(msg);
446 }
447
448 void CmiDirectInit()
449 {
450     CpvInitialize(int,  CmiHandleDirectIdx);
451     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
452 }
453
454 #endif
455 typedef struct  rmda_msg
456 {
457     int                   destNode;
458 #if REMOTE_EVENT
459     int                   ack_index;
460 #endif
461     gni_post_descriptor_t *pd;
462 #if !CMK_SMP
463     struct  rmda_msg      *next;
464 #endif
465 }RDMA_REQUEST;
466
467
468 #if CMK_SMP
469 #define SMP_LOCKS               0
470 #define ONE_SEND_QUEUE                  0
471 PCQueue sendRdmaBuf;
472 typedef struct  msg_list_index
473 {
474     PCQueue     sendSmsgBuf;
475     int         pushed;
476     CmiNodeLock   lock;
477 } MSG_LIST_INDEX;
478 char                *destpe_avail;
479 #if  !ONE_SEND_QUEUE && SMP_LOCKS
480     PCQueue     nonEmptyQueues;
481 #endif
482 #else         /* non-smp */
483
484 static RDMA_REQUEST        *sendRdmaBuf = 0;
485 static RDMA_REQUEST        *sendRdmaTail = 0;
486 typedef struct  msg_list_index
487 {
488     int         next;
489     MSG_LIST    *sendSmsgBuf;
490     MSG_LIST    *tail;
491 } MSG_LIST_INDEX;
492
493 #endif
494
495 // buffered send queue
496 #if ! ONE_SEND_QUEUE
497 typedef struct smsg_queue
498 {
499     MSG_LIST_INDEX   *smsg_msglist_index;
500     int               smsg_head_index;
501 } SMSG_QUEUE;
502 #else
503 typedef struct smsg_queue
504 {
505     PCQueue       sendMsgBuf;
506 }  SMSG_QUEUE;
507 #endif
508
509 SMSG_QUEUE                  smsg_queue;
510 #if CMK_USE_OOB
511 SMSG_QUEUE                  smsg_oob_queue;
512 #endif
513
514 #if CMK_SMP
515
516 #define FreeMsgList(d)   free(d);
517 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
518
519 #else
520
521 static MSG_LIST       *msglist_freelist=0;
522
523 #define FreeMsgList(d)  \
524   do { \
525   (d)->next = msglist_freelist;\
526   msglist_freelist = d; \
527   } while (0)
528
529 #define MallocMsgList(d) \
530   do {  \
531   d = msglist_freelist;\
532   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
533              _MEMCHECK(d);\
534   } else msglist_freelist = d->next; \
535   d->next =0;  \
536   } while (0)
537
538 #endif
539
540 #if CMK_SMP
541
542 #define FreeControlMsg(d)      free(d);
543 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
544
545 #else
546
547 static CONTROL_MSG    *control_freelist=0;
548
549 #define FreeControlMsg(d)       \
550   do { \
551   (d)->next = control_freelist;\
552   control_freelist = d; \
553   } while (0);
554
555 #define MallocControlMsg(d) \
556   do {  \
557   d = control_freelist;\
558   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
559              _MEMCHECK(d);\
560   } else control_freelist = d->next;  \
561   } while (0);
562
563 #endif
564
565 #if CMK_SMP
566
567 #define FreeAckMsg(d)      free(d);
568 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
569
570 #else
571
572 static ACK_MSG        *ack_freelist=0;
573
574 #define FreeAckMsg(d)       \
575   do { \
576   (d)->next = ack_freelist;\
577   ack_freelist = d; \
578   } while (0)
579
580 #define MallocAckMsg(d) \
581   do { \
582   d = ack_freelist;\
583   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
584              _MEMCHECK(d);\
585   } else ack_freelist = d->next; \
586   } while (0)
587
588 #endif
589
590
591 # if CMK_SMP
592 #define FreeRdmaRequest(d)       free(d);
593 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
594 #else
595
596 static RDMA_REQUEST         *rdma_freelist = NULL;
597
598 #define FreeRdmaRequest(d)       \
599   do {  \
600   (d)->next = rdma_freelist;\
601   rdma_freelist = d;    \
602   } while (0)
603
604 #define MallocRdmaRequest(d) \
605   do {   \
606   d = rdma_freelist;\
607   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
608              _MEMCHECK(d);\
609   } else rdma_freelist = d->next; \
610   d->next =0;   \
611   } while (0)
612 #endif
613
614 /* reuse gni_post_descriptor_t */
615 static gni_post_descriptor_t *post_freelist=0;
616
617 #if  !CMK_SMP
618 #define FreePostDesc(d)       \
619     (d)->next_descr = post_freelist;\
620     post_freelist = d;
621
622 #define MallocPostDesc(d) \
623   d = post_freelist;\
624   if (d==0) { \
625      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
626      d->next_descr = 0;\
627       _MEMCHECK(d);\
628   } else post_freelist = d->next_descr;
629 #else
630
631 #define FreePostDesc(d)     free(d);
632 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
633
634 #endif
635
636
637 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
638 static int      buffered_smsg_counter = 0;
639
640 /* SmsgSend return success but message sent is not confirmed by remote side */
641 static MSG_LIST *buffered_fma_head = 0;
642 static MSG_LIST *buffered_fma_tail = 0;
643
644 /* functions  */
645 #define IsFree(a,ind)  !( a& (1<<(ind) ))
646 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
647 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
648
649 CpvDeclare(mempool_type*, mempool);
650
651 #if REMOTE_EVENT
652 /* ack pool for remote events */
653
654 static int  SHIFT   =           18;
655 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
656 #define RANK_MASK               ((1<<SHIFT) - 1)
657 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
658
659 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
660 #define GET_RANK(evt)           ((evt) & RANK_MASK)
661 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
662
663 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
664
665 #if CMK_SMP
666 #define INIT_SIZE                4096
667 #else
668 #define INIT_SIZE                1024
669 #endif
670
671 struct IndexStruct {
672 void *addr;
673 int next;
674 int type;     // 1: ACK   2: Persistent
675 };
676
677 typedef struct IndexPool {
678     struct IndexStruct   *indexes;
679     int                   size;
680     int                   freehead;
681     CmiNodeLock           lock;
682 } IndexPool;
683
684 static IndexPool  ackPool;
685 #if CMK_PERSISTENT_COMM
686 static IndexPool  persistPool;
687 #endif
688
689 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
690 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
691
692 static void IndexPool_init(IndexPool *pool)
693 {
694     int i;
695     if ((1<<SHIFT) < mysize) 
696         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
697     pool->size = INIT_SIZE;
698     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
699     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
700     for (i=0; i<pool->size-1; i++) {
701         pool->indexes[i].next = i+1;
702         pool->indexes[i].type = 0;
703     }
704     pool->indexes[i].next = -1;
705     pool->freehead = 0;
706 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
707     pool->lock  = CmiCreateLock();
708 #else
709     pool->lock  = 0;
710 #endif
711 }
712
713 static
714 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
715 {
716     int i;
717     int s;
718 #if MULTI_THREAD_SEND  
719     CmiLock(pool->lock);
720 #endif
721     s = pool->freehead;
722     if (s == -1) {
723         int newsize = pool->size * 2;
724         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
725         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
726         struct IndexStruct *old_ackpool = pool->indexes;
727         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
728         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
729         for (i=pool->size; i<newsize-1; i++) {
730             pool->indexes[i].next = i+1;
731             pool->indexes[i].type = 0;
732         }
733         pool->indexes[i].next = -1;
734         pool->indexes[i].type = 0;
735         pool->freehead = pool->size;
736         s = pool->size;
737         pool->size = newsize;
738         free(old_ackpool);
739     }
740     pool->freehead = pool->indexes[s].next;
741     pool->indexes[s].addr = addr;
742     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
743     pool->indexes[s].type = type;
744 #if MULTI_THREAD_SEND
745     CmiUnlock(pool->lock);
746 #endif
747     return s;
748 }
749
750 static
751 inline  void IndexPool_freeslot(IndexPool *pool, int s)
752 {
753     CmiAssert(s>=0 && s<pool->size);
754 #if MULTI_THREAD_SEND
755     CmiLock(pool->lock);
756 #endif
757     pool->indexes[s].next = pool->freehead;
758     pool->indexes[s].type = 0;
759     pool->freehead = s;
760 #if MULTI_THREAD_SEND
761     CmiUnlock(pool->lock);
762 #endif
763 }
764
765
766 #endif
767
768 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
769 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
770 #define CHARM_MAGIC_NUMBER               126
771
772 #if CMK_ERROR_CHECKING
773 extern unsigned char computeCheckSum(unsigned char *data, int len);
774 static int checksum_flag = 0;
775 #define CMI_SET_CHECKSUM(msg, len)      \
776         if (checksum_flag)  {   \
777           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
778           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
779         }
780 #define CMI_CHECK_CHECKSUM(msg, len)    \
781         if (checksum_flag)      \
782           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
783             CmiAbort("Fatal error: checksum doesn't agree!\n");
784 #else
785 #define CMI_SET_CHECKSUM(msg, len)
786 #define CMI_CHECK_CHECKSUM(msg, len)
787 #endif
788 /* =====End of Definitions of Message-Corruption Related Macros=====*/
789
790 static int print_stats = 0;
791 static int stats_off = 0;
792 void CmiTurnOnStats()
793 {
794     stats_off = 0;
795 }
796
797 void CmiTurnOffStats()
798 {
799     stats_off = 1;
800 }
801
802 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
803
804 #if CMK_WITH_STATS
805 FILE *counterLog = NULL;
806 typedef struct comm_thread_stats
807 {
808     uint64_t  smsg_data_count;
809     uint64_t  lmsg_init_count;
810     uint64_t  ack_count;
811     uint64_t  big_msg_ack_count;
812     uint64_t  smsg_count;
813     uint64_t  direct_put_done_count;
814     uint64_t  put_done_count;
815     //times of calling SmsgSend
816     uint64_t  try_smsg_data_count;
817     uint64_t  try_lmsg_init_count;
818     uint64_t  try_ack_count;
819     uint64_t  try_big_msg_ack_count;
820     uint64_t  try_direct_put_done_count;
821     uint64_t  try_put_done_count;
822     uint64_t  try_smsg_count;
823     
824     double    max_time_in_send_buffered_smsg;
825     double    all_time_in_send_buffered_smsg;
826
827     uint64_t  rdma_get_count, rdma_put_count;
828     uint64_t  try_rdma_get_count, try_rdma_put_count;
829     double    max_time_from_control_to_rdma_init;
830     double    all_time_from_control_to_rdma_init;
831
832     double    max_time_from_rdma_init_to_rdma_done;
833     double    all_time_from_rdma_init_to_rdma_done;
834
835     int      count_in_PumpNetwork;
836     double   time_in_PumpNetwork;
837     double   max_time_in_PumpNetwork;
838     int      count_in_SendBufferMsg_smsg;
839     double   time_in_SendBufferMsg_smsg;
840     double   max_time_in_SendBufferMsg_smsg;
841     int      count_in_SendRdmaMsg;
842     double   time_in_SendRdmaMsg;
843     double   max_time_in_SendRdmaMsg;
844     int      count_in_PumpRemoteTransactions;
845     double   time_in_PumpRemoteTransactions;
846     double   max_time_in_PumpRemoteTransactions;
847     int      count_in_PumpLocalTransactions_rdma;
848     double   time_in_PumpLocalTransactions_rdma;
849     double   max_time_in_PumpLocalTransactions_rdma;
850 } Comm_Thread_Stats;
851
852 static Comm_Thread_Stats   comm_stats;
853
854 static void init_comm_stats()
855 {
856   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
857   if (print_stats){
858       char ln[200];
859       int code = mkdir("counters", 00777); 
860       sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
861       counterLog=fopen(ln,"w");
862   }
863 }
864
865 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
866
867 #define SMSG_SENT_DONE(creation_time, tag)  \
868         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
869             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
870             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
871             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
872             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
873             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
874             comm_stats.smsg_count++; \
875             double inbuff_time = CmiWallTimer() - creation_time;   \
876             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
877             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
878         }
879
880 #define SMSG_TRY_SEND(tag)  \
881         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
882             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
883             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
884             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
885             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
886             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
887             comm_stats.try_smsg_count++; \
888         }
889
890 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
891
892 #define  RDMA_TRANS_DONE(x)      \
893          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
894              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
895              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
896          }
897
898 #define  RDMA_TRANS_INIT(type, x)      \
899          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
900              double rdma_trans_time = CmiWallTimer() - x ; \
901              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
902              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
903          }
904
905 #define STATS_PUMPNETWORK_TIME(x)   \
906         { double t = CmiWallTimer(); \
907           x;        \
908           t = CmiWallTimer() - t;          \
909           comm_stats.count_in_PumpNetwork++;        \
910           comm_stats.time_in_PumpNetwork += t;   \
911           if (t>comm_stats.max_time_in_PumpNetwork)      \
912               comm_stats.max_time_in_PumpNetwork = t;    \
913         }
914
915 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
916         { double t = CmiWallTimer(); \
917           x;        \
918           t = CmiWallTimer() - t;          \
919           comm_stats.count_in_PumpRemoteTransactions ++;        \
920           comm_stats.time_in_PumpRemoteTransactions += t;   \
921           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
922               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
923         }
924
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
926         { double t = CmiWallTimer(); \
927           x;        \
928           t = CmiWallTimer() - t;          \
929           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
930           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
931           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
932               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
933         }
934
935 #define STATS_SEND_SMSGS_TIME(x)   \
936         { double t = CmiWallTimer(); \
937           x;        \
938           t = CmiWallTimer() - t;          \
939           comm_stats.count_in_SendBufferMsg_smsg ++;        \
940           comm_stats.time_in_SendBufferMsg_smsg += t;   \
941           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
942               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
943         }
944
945 #define STATS_SENDRDMAMSG_TIME(x)   \
946         { double t = CmiWallTimer(); \
947           x;        \
948           t = CmiWallTimer() - t;          \
949           comm_stats.count_in_SendRdmaMsg ++;        \
950           comm_stats.time_in_SendRdmaMsg += t;   \
951           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
952               comm_stats.max_time_in_SendRdmaMsg = t;    \
953         }
954
955 static void print_comm_stats()
956 {
957     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
958     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
959             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
960             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
961     
962     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
963             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
964             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
965
966     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
967     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
968     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
969
970
971     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
972     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
973     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
974     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
975     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
976     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
977 }
978
979 #else
980 #define STATS_PUMPNETWORK_TIME(x)                  x
981 #define STATS_SEND_SMSGS_TIME(x)                   x
982 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
983 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
984 #define STATS_SENDRDMAMSG_TIME(x)                  x
985 #endif
986
987 static void
988 allgather(void *in,void *out, int len)
989 {
990     static int *ivec_ptr=NULL,already_called=0,job_size=0;
991     int i,rc;
992     int my_rank;
993     char *tmp_buf,*out_ptr;
994
995     if(!already_called) {
996
997         rc = PMI_Get_size(&job_size);
998         CmiAssert(rc == PMI_SUCCESS);
999         rc = PMI_Get_rank(&my_rank);
1000         CmiAssert(rc == PMI_SUCCESS);
1001
1002         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1003         CmiAssert(ivec_ptr != NULL);
1004
1005         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1006         CmiAssert(rc == PMI_SUCCESS);
1007
1008         already_called = 1;
1009
1010     }
1011
1012     tmp_buf = (char *)malloc(job_size * len);
1013     CmiAssert(tmp_buf);
1014
1015     rc = PMI_Allgather(in,tmp_buf,len);
1016     CmiAssert(rc == PMI_SUCCESS);
1017
1018     out_ptr = out;
1019
1020     for(i=0;i<job_size;i++) {
1021
1022         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1023
1024     }
1025
1026     free(tmp_buf);
1027 }
1028
1029 static void
1030 allgather_2(void *in,void *out, int len)
1031 {
1032     //PMI_Allgather is out of order
1033     int i,rc, extend_len;
1034     int  rank_index;
1035     char *out_ptr, *out_ref;
1036     char *in2;
1037
1038     extend_len = sizeof(int) + len;
1039     in2 = (char*)malloc(extend_len);
1040
1041     memcpy(in2, &myrank, sizeof(int));
1042     memcpy(in2+sizeof(int), in, len);
1043
1044     out_ptr = (char*)malloc(mysize*extend_len);
1045
1046     rc = PMI_Allgather(in2, out_ptr, extend_len);
1047     GNI_RC_CHECK("allgather", rc);
1048
1049     out_ref = out;
1050
1051     for(i=0;i<mysize;i++) {
1052         //rank index 
1053         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1054         //copy to the rank index slot
1055         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1056     }
1057
1058     free(out_ptr);
1059     free(in2);
1060
1061 }
1062
1063 static unsigned int get_gni_nic_address(int device_id)
1064 {
1065     unsigned int address, cpu_id;
1066     gni_return_t status;
1067     int i, alps_dev_id=-1,alps_address=-1;
1068     char *token, *p_ptr;
1069
1070     p_ptr = getenv("PMI_GNI_DEV_ID");
1071     if (!p_ptr) {
1072         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1073        
1074         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1075     } else {
1076         while ((token = strtok(p_ptr,":")) != NULL) {
1077             alps_dev_id = atoi(token);
1078             if (alps_dev_id == device_id) {
1079                 break;
1080             }
1081             p_ptr = NULL;
1082         }
1083         CmiAssert(alps_dev_id != -1);
1084         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1085         CmiAssert(p_ptr != NULL);
1086         i = 0;
1087         while ((token = strtok(p_ptr,":")) != NULL) {
1088             if (i == alps_dev_id) {
1089                 alps_address = atoi(token);
1090                 break;
1091             }
1092             p_ptr = NULL;
1093             ++i;
1094         }
1095         CmiAssert(alps_address != -1);
1096         address = alps_address;
1097     }
1098     return address;
1099 }
1100
1101 static uint8_t get_ptag(void)
1102 {
1103     char *p_ptr, *token;
1104     uint8_t ptag;
1105
1106     p_ptr = getenv("PMI_GNI_PTAG");
1107     CmiAssert(p_ptr != NULL);
1108     token = strtok(p_ptr, ":");
1109     ptag = (uint8_t)atoi(token);
1110     return ptag;
1111         
1112 }
1113
1114 static uint32_t get_cookie(void)
1115 {
1116     uint32_t cookie;
1117     char *p_ptr, *token;
1118
1119     p_ptr = getenv("PMI_GNI_COOKIE");
1120     CmiAssert(p_ptr != NULL);
1121     token = strtok(p_ptr, ":");
1122     cookie = (uint32_t)atoi(token);
1123
1124     return cookie;
1125 }
1126
1127 #if LARGEPAGE
1128
1129 /* directly mmap memory from hugetlbfs for large pages */
1130
1131 #include <sys/stat.h>
1132 #include <fcntl.h>
1133 #include <sys/mman.h>
1134 #include <hugetlbfs.h>
1135
1136 // size must be _tlbpagesize aligned
1137 void *my_get_huge_pages(size_t size)
1138 {
1139     char filename[512];
1140     int fd;
1141     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1142     void *ptr = NULL;
1143
1144     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1145     fd = open(filename, O_RDWR | O_CREAT, mode);
1146     if (fd == -1) {
1147         CmiAbort("my_get_huge_pages: open filed");
1148     }
1149     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1150     if (ptr == MAP_FAILED) ptr = NULL;
1151 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1152     close(fd);
1153     unlink(filename);
1154     return ptr;
1155 }
1156
1157 void my_free_huge_pages(void *ptr, int size)
1158 {
1159 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1160     int ret = munmap(ptr, size);
1161     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1162 }
1163
1164 #endif
1165
1166 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1167 /* TODO: add any that are related */
1168 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1169
1170
1171 #include "machine-lrts.h"
1172 #include "machine-common-core.c"
1173
1174 /* Network progress function is used to poll the network when for
1175    messages. This flushes receive buffers on some  implementations*/
1176 #if CMK_MACHINE_PROGRESS_DEFINED
1177 void CmiMachineProgressImpl() {
1178 }
1179 #endif
1180
1181 static int SendBufferMsg(SMSG_QUEUE *queue);
1182 static void SendRdmaMsg();
1183 static void PumpNetworkSmsg();
1184 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1185 #if CQWRITE
1186 static void PumpCqWriteTransactions();
1187 #endif
1188 #if REMOTE_EVENT
1189 static void PumpRemoteTransactions();
1190 #endif
1191
1192 #if MACHINE_DEBUG_LOG
1193 FILE *debugLog = NULL;
1194 static CmiInt8 buffered_recv_msg = 0;
1195 int         lrts_smsg_success = 0;
1196 int         lrts_received_msg = 0;
1197 #endif
1198
1199 static void sweep_mempool(mempool_type *mptr)
1200 {
1201     int n = 0;
1202     block_header *current = &(mptr->block_head);
1203
1204     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1205     while( current!= NULL) {
1206         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1207         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1208     }
1209     printf("[n %d] sweep_mempool slot END.\n", myrank);
1210 }
1211
1212 inline
1213 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1214 {
1215     block_header *current = *from;
1216
1217     //while(register_memory_size>= MAX_REG_MEM)
1218     //{
1219         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1220             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1221
1222         *from = current;
1223         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1224         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1225         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1226     //}
1227     return GNI_RC_SUCCESS;
1228 }
1229
1230 inline 
1231 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1232 {
1233     gni_return_t status = GNI_RC_SUCCESS;
1234     //int size = GetMempoolsize(msg);
1235     //void *blockaddr = GetMempoolBlockPtr(msg);
1236     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1237    
1238     block_header *current = &(mptr->block_head);
1239     while(register_memory_size>= MAX_REG_MEM)
1240     {
1241         status = deregisterMemory(mptr, &current);
1242         if (status != GNI_RC_SUCCESS) break;
1243     }
1244     if(register_memory_size>= MAX_REG_MEM) return status;
1245
1246     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1247     while(1)
1248     {
1249         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1250         if(status == GNI_RC_SUCCESS)
1251         {
1252             break;
1253         }
1254         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1255         {
1256             CmiAbort("Memory registor for mempool fails\n");
1257         }
1258         else
1259         {
1260             status = deregisterMemory(mptr, &current);
1261             if (status != GNI_RC_SUCCESS) break;
1262         }
1263     }; 
1264     return status;
1265 }
1266
1267 inline 
1268 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1269 {
1270     static int rank = -1;
1271     int i;
1272     gni_return_t status;
1273     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1274     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1275     mempool_type *mptr;
1276
1277     status = registerFromMempool(mptr1, msg, size, t, cqh);
1278     if (status == GNI_RC_SUCCESS) return status;
1279 #if CMK_SMP 
1280     for (i=0; i<CmiMyNodeSize()+1; i++) {
1281       rank = (rank+1)%(CmiMyNodeSize()+1);
1282       mptr = CpvAccessOther(mempool, rank);
1283       if (mptr == mptr1) continue;
1284       status = registerFromMempool(mptr, msg, size, t, cqh);
1285       if (status == GNI_RC_SUCCESS) return status;
1286     }
1287 #endif
1288     return  GNI_RC_ERROR_RESOURCE;
1289 }
1290
1291 inline
1292 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1293 {
1294     MSG_LIST        *msg_tmp;
1295     MallocMsgList(msg_tmp);
1296     msg_tmp->destNode = destNode;
1297     msg_tmp->size   = size;
1298     msg_tmp->msg    = msg;
1299     msg_tmp->tag    = tag;
1300 #if CMK_WITH_STATS
1301     SMSG_CREATION(msg_tmp)
1302 #endif
1303 #if !CMK_SMP
1304     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1305         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1306         queue->smsg_head_index = destNode;
1307         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1308     }else
1309     {
1310         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1311         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1312     }
1313 #else
1314 #if ONE_SEND_QUEUE
1315     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1316 #else
1317 #if SMP_LOCKS
1318     CmiLock(queue->smsg_msglist_index[destNode].lock);
1319     if(queue->smsg_msglist_index[destNode].pushed == 0)
1320     {
1321         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1322     }
1323     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1324     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1325 #else
1326     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1327 #endif
1328 #endif
1329 #endif
1330 #if PRINT_SYH
1331     buffered_smsg_counter++;
1332 #endif
1333 }
1334
1335 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1336 {
1337     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1338 }
1339
1340 inline
1341 static void setup_smsg_connection(int destNode)
1342 {
1343     mdh_addr_list_t  *new_entry = 0;
1344     gni_post_descriptor_t *pd;
1345     gni_smsg_attr_t      *smsg_attr;
1346     gni_return_t status = GNI_RC_NOT_DONE;
1347     RDMA_REQUEST        *rdma_request_msg;
1348     
1349     if(smsg_available_slot == smsg_expand_slots)
1350     {
1351         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1352         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1353         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1354
1355         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1356             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1357             GNI_MEM_READWRITE,   
1358             -1,
1359             &(new_entry->mdh));
1360         smsg_available_slot = 0; 
1361         new_entry->next = smsg_dynamic_list;
1362         smsg_dynamic_list = new_entry;
1363     }
1364     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1365     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1366     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1367     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1368     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1369     smsg_attr->buff_size = smsg_memlen;
1370     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1371     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1372     smsg_local_attr_vec[destNode] = smsg_attr;
1373     smsg_available_slot++;
1374     MallocPostDesc(pd);
1375     pd->type            = GNI_POST_FMA_PUT;
1376     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1377     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1378     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1379     pd->length          = sizeof(gni_smsg_attr_t);
1380     pd->local_addr      = (uint64_t) smsg_attr;
1381     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1382     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1383     pd->src_cq_hndl     = rdma_tx_cqh;
1384     pd->rdma_mode       = 0;
1385     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1386     print_smsg_attr(smsg_attr);
1387     if(status == GNI_RC_ERROR_RESOURCE )
1388     {
1389         MallocRdmaRequest(rdma_request_msg);
1390         rdma_request_msg->destNode = destNode;
1391         rdma_request_msg->pd = pd;
1392         /* buffer this request */
1393     }
1394 #if PRINT_SYH
1395     if(status != GNI_RC_SUCCESS)
1396        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1397     else
1398         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1399 #endif
1400 }
1401
1402 /* useDynamicSMSG */
1403 inline 
1404 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1405 {
1406     gni_return_t status = GNI_RC_NOT_DONE;
1407
1408     if(mailbox_list->offset == mailbox_list->size)
1409     {
1410         dynamic_smsg_mailbox_t *new_mailbox_entry;
1411         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1412         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1413         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1414         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1415         new_mailbox_entry->offset = 0;
1416         
1417         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1418             new_mailbox_entry->size, smsg_rx_cqh,
1419             GNI_MEM_READWRITE,   
1420             -1,
1421             &(new_mailbox_entry->mem_hndl));
1422
1423         GNI_RC_CHECK("register", status);
1424         new_mailbox_entry->next = mailbox_list;
1425         mailbox_list = new_mailbox_entry;
1426     }
1427     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1428     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1429     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1430     local_smsg_attr->mbox_offset = mailbox_list->offset;
1431     mailbox_list->offset += smsg_memlen;
1432     local_smsg_attr->buff_size = smsg_memlen;
1433     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1434     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1435 }
1436
1437 /* useDynamicSMSG */
1438 inline 
1439 static int connect_to(int destNode)
1440 {
1441     gni_return_t status = GNI_RC_NOT_DONE;
1442     CmiAssert(smsg_connected_flag[destNode] == 0);
1443     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1444     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1445     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1446     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1447     
1448     CMI_GNI_LOCK(global_gni_lock)
1449     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1450     CMI_GNI_UNLOCK(global_gni_lock)
1451     if (status == GNI_RC_ERROR_RESOURCE) {
1452       /* possibly destNode is making connection at the same time */
1453       free(smsg_attr_vector_local[destNode]);
1454       smsg_attr_vector_local[destNode] = NULL;
1455       free(smsg_attr_vector_remote[destNode]);
1456       smsg_attr_vector_remote[destNode] = NULL;
1457       mailbox_list->offset -= smsg_memlen;
1458       return 0;
1459     }
1460     GNI_RC_CHECK("GNI_Post", status);
1461     smsg_connected_flag[destNode] = 1;
1462     return 1;
1463 }
1464
1465 inline 
1466 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1467 {
1468     unsigned int          remote_address;
1469     uint32_t              remote_id;
1470     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1471     gni_smsg_attr_t       *smsg_attr;
1472     gni_post_descriptor_t *pd;
1473     gni_post_state_t      post_state;
1474     char                  *real_data; 
1475
1476     if (useDynamicSMSG) {
1477         switch (smsg_connected_flag[destNode]) {
1478         case 0: 
1479             connect_to(destNode);         /* continue to case 1 */
1480         case 1:                           /* pending connection, do nothing */
1481             status = GNI_RC_NOT_DONE;
1482             if(inbuff ==0)
1483                 buffer_small_msgs(queue, msg, size, destNode, tag);
1484             return status;
1485         }
1486     }
1487 #if CMK_SMP
1488 #if ! ONE_SEND_QUEUE
1489     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1490 #endif
1491     {
1492 #else
1493     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1494     {
1495 #endif
1496         //CMI_GNI_LOCK(smsg_mailbox_lock)
1497         CMI_GNI_LOCK(default_tx_cq_lock)
1498 #if CMK_SMP_TRACE_COMMTHREAD
1499         int oldpe = -1;
1500         int oldeventid = -1;
1501         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1502         { 
1503             START_EVENT();
1504             if ( tag == SMALL_DATA_TAG)
1505                 real_data = (char*)msg; 
1506             else 
1507                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1508             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1509             TRACE_COMM_SET_COMM_MSGID(real_data);
1510         }
1511 #endif
1512 #if REMOTE_EVENT
1513         if (tag == LMSG_INIT_TAG) {
1514             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1515             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1516                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1517         }
1518         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1519 #endif
1520 #if     CMK_WITH_STATS
1521         SMSG_TRY_SEND(tag)
1522 #endif
1523 #if CMK_WITH_STATS
1524     double              creation_time;
1525     if (ptr == NULL)
1526         creation_time = CmiWallTimer();
1527     else
1528         creation_time = ptr->creation_time;
1529 #endif
1530
1531     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1532 #if CMK_SMP_TRACE_COMMTHREAD
1533         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1534 #endif
1535         CMI_GNI_UNLOCK(default_tx_cq_lock)
1536         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1537         if(status == GNI_RC_SUCCESS)
1538         {
1539 #if     CMK_WITH_STATS
1540             SMSG_SENT_DONE(creation_time,tag) 
1541 #endif
1542 #if CMK_SMP_TRACE_COMMTHREAD
1543             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1544             { 
1545                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1546             }
1547 #endif
1548         }else
1549             status = GNI_RC_ERROR_RESOURCE;
1550     }
1551     if(status != GNI_RC_SUCCESS && inbuff ==0)
1552         buffer_small_msgs(queue, msg, size, destNode, tag);
1553     return status;
1554 }
1555
1556 inline 
1557 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1558 {
1559     /* construct a control message and send */
1560     CONTROL_MSG         *control_msg_tmp;
1561     MallocControlMsg(control_msg_tmp);
1562     control_msg_tmp->source_addr = (uint64_t)msg;
1563     control_msg_tmp->seq_id    = seqno;
1564     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1565 #if REMOTE_EVENT
1566     control_msg_tmp->ack_index    =  -1;
1567 #endif
1568 #if     USE_LRTS_MEMPOOL
1569     if(size < BIG_MSG)
1570     {
1571         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1572     }
1573     else
1574     {
1575         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1576         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1577         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1578     }
1579 #else
1580     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1581 #endif
1582     return control_msg_tmp;
1583 }
1584
1585 #define BLOCKING_SEND_CONTROL    0
1586
1587 // Large message, send control to receiver, receiver register memory and do a GET, 
1588 // return 1 - send no success
1589 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1590 {
1591     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1592     uint32_t            vmdh_index  = -1;
1593     int                 size;
1594     int                 offset = 0;
1595     uint64_t            source_addr;
1596     int                 register_size; 
1597     void                *msg;
1598
1599     size    =   control_msg_tmp->total_length;
1600     source_addr = control_msg_tmp->source_addr;
1601     register_size = control_msg_tmp->length;
1602
1603 #if  USE_LRTS_MEMPOOL
1604     if( control_msg_tmp->seq_id == 0 ){
1605 #if BLOCKING_SEND_CONTROL
1606         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1607             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1608                 LrtsAdvanceCommunication(0);
1609         }
1610 #endif
1611         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1612         {
1613             msg = (void*)source_addr;
1614             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1615             {
1616                 if(!inbuff)
1617                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1618                 return GNI_RC_ERROR_NOMEM;
1619             }
1620             //register the corresponding mempool
1621             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1622             if(status == GNI_RC_SUCCESS)
1623             {
1624                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1625             }
1626         }else
1627         {
1628             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1629             status = GNI_RC_SUCCESS;
1630         }
1631         if(NoMsgInSend(source_addr))
1632             register_size = GetMempoolsize((void*)(source_addr));
1633         else
1634             register_size = 0;
1635     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1636     {
1637         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1638         source_addr += offset;
1639         size = control_msg_tmp->length;
1640 #if BLOCKING_SEND_CONTROL
1641         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1642             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1643                 LrtsAdvanceCommunication(0);
1644         }
1645 #endif
1646         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1647             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1648             {
1649                 if(!inbuff)
1650                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1651                 return GNI_RC_ERROR_NOMEM;
1652             }
1653             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1654             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1655         }
1656         else
1657         {
1658             status = GNI_RC_SUCCESS;
1659         }
1660         register_size = 0;  
1661     }
1662
1663     if(status == GNI_RC_SUCCESS)
1664     {
1665         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1666         if(status == GNI_RC_SUCCESS)
1667         {
1668             buffered_send_msg += register_size;
1669             if(control_msg_tmp->seq_id == 0)
1670             {
1671                 IncreaseMsgInSend(source_addr);
1672             }
1673             FreeControlMsg(control_msg_tmp);
1674             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1675         }else
1676             status = GNI_RC_ERROR_RESOURCE;
1677
1678     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1679     {
1680         CmiAbort("Memory registor for large msg\n");
1681     }else 
1682     {
1683         status = GNI_RC_ERROR_NOMEM; 
1684         if(!inbuff)
1685             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1686     }
1687     return status;
1688 #else
1689     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1690     if(status == GNI_RC_SUCCESS)
1691     {
1692         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1693         if(status == GNI_RC_SUCCESS)
1694         {
1695             FreeControlMsg(control_msg_tmp);
1696         }
1697     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1698     {
1699         CmiAbort("Memory registor for large msg\n");
1700     }else 
1701     {
1702         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1703     }
1704     return status;
1705 #endif
1706 }
1707
1708 inline void LrtsPrepareEnvelope(char *msg, int size)
1709 {
1710     CmiSetMsgSize(msg, size);
1711     CMI_SET_CHECKSUM(msg, size);
1712 }
1713
1714 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1715 {
1716     gni_return_t        status  =   GNI_RC_SUCCESS;
1717     uint8_t tag;
1718     CONTROL_MSG         *control_msg_tmp;
1719     int                 oob = ( mode & OUT_OF_BAND);
1720     SMSG_QUEUE          *queue;
1721
1722     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1723 #if CMK_USE_OOB
1724     queue = oob? &smsg_oob_queue : &smsg_queue;
1725 #else
1726     queue = &smsg_queue;
1727 #endif
1728
1729     LrtsPrepareEnvelope(msg, size);
1730
1731 #if PRINT_SYH
1732     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1733 #endif 
1734 #if CMK_SMP 
1735     if(size <= SMSG_MAX_MSG)
1736         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1737     else if (size < BIG_MSG) {
1738         control_msg_tmp =  construct_control_msg(size, msg, 0);
1739         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1740     }
1741     else {
1742           CmiSetMsgSeq(msg, 0);
1743           control_msg_tmp =  construct_control_msg(size, msg, 1);
1744           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1745     }
1746 #else   //non-smp, smp(worker sending)
1747     if(size <= SMSG_MAX_MSG)
1748     {
1749         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1750             CmiFree(msg);
1751     }
1752     else if (size < BIG_MSG) {
1753         control_msg_tmp =  construct_control_msg(size, msg, 0);
1754         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1755     }
1756     else {
1757 #if     USE_LRTS_MEMPOOL
1758         CmiSetMsgSeq(msg, 0);
1759         control_msg_tmp =  construct_control_msg(size, msg, 1);
1760         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1761 #else
1762         control_msg_tmp =  construct_control_msg(size, msg, 0);
1763         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1764 #endif
1765     }
1766 #endif
1767     return 0;
1768 }
1769
1770 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1771 {
1772   int i;
1773 #if CMK_BROADCAST_USE_CMIREFERENCE
1774   for(i=0;i<npes;i++) {
1775     if (pes[i] == CmiMyPe())
1776       CmiSyncSend(pes[i], len, msg);
1777     else {
1778       CmiReference(msg);
1779       CmiSyncSendAndFree(pes[i], len, msg);
1780     }
1781   }
1782 #else
1783   for(i=0;i<npes;i++) {
1784     CmiSyncSend(pes[i], len, msg);
1785   }
1786 #endif
1787 }
1788
1789 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1790 {
1791   /* A better asynchronous implementation may be wanted, but at least it works */
1792   CmiSyncListSendFn(npes, pes, len, msg);
1793   return (CmiCommHandle) 0;
1794 }
1795
1796 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1797 {
1798   if (npes == 1) {
1799       CmiSyncSendAndFree(pes[0], len, msg);
1800       return;
1801   }
1802 #if CMK_PERSISTENT_COMM
1803   if (CpvAccess(phs) && len > 1024) {
1804       int i;
1805       for(i=0;i<npes;i++) {
1806         if (pes[i] == CmiMyPe())
1807           CmiSyncSend(pes[i], len, msg);
1808         else {
1809           CmiReference(msg);
1810           CmiSyncSendAndFree(pes[i], len, msg);
1811         }
1812       }
1813       CmiFree(msg);
1814       return;
1815   }
1816 #endif
1817   
1818 #if CMK_BROADCAST_USE_CMIREFERENCE
1819   if (npes == 1) {
1820     CmiSyncSendAndFree(pes[0], len, msg);
1821     return;
1822   }
1823   CmiSyncListSendFn(npes, pes, len, msg);
1824   CmiFree(msg);
1825 #else
1826   int i;
1827   for(i=0;i<npes-1;i++) {
1828     CmiSyncSend(pes[i], len, msg);
1829   }
1830   if (npes>0)
1831     CmiSyncSendAndFree(pes[npes-1], len, msg);
1832   else 
1833     CmiFree(msg);
1834 #endif
1835 }
1836
1837 static void    PumpDatagramConnection();
1838 static      int         event_SetupConnect = 111;
1839 static      int         event_PumpSmsg = 222 ;
1840 static      int         event_PumpTransaction = 333;
1841 static      int         event_PumpRdmaTransaction = 444;
1842 static      int         event_SendBufferSmsg = 444;
1843 static      int         event_SendFmaRdmaMsg = 555;
1844 static      int         event_AdvanceCommunication = 666;
1845
1846 static void registerUserTraceEvents() {
1847 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1848     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1849     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1850     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1851     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1852     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1853     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1854     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1855 #endif
1856 }
1857
1858 static void ProcessDeadlock()
1859 {
1860     static CmiUInt8 *ptr = NULL;
1861     static CmiUInt8  last = 0, mysum, sum;
1862     static int count = 0;
1863     gni_return_t status;
1864     int i;
1865
1866 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1867 //sweep_mempool(CpvAccess(mempool));
1868     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1869     mysum = smsg_send_count + smsg_recv_count;
1870     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1871     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1872     GNI_RC_CHECK("PMI_Allgather", status);
1873     sum = 0;
1874     for (i=0; i<mysize; i++)  sum+= ptr[i];
1875     if (last == 0 || sum == last) 
1876         count++;
1877     else
1878         count = 0;
1879     last = sum;
1880     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1881     if (count == 2) { 
1882         /* detected twice, it is a real deadlock */
1883         if (myrank == 0)  {
1884             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1885             CmiAbort("Fatal> Deadlock detected.");
1886         }
1887
1888     }
1889     _detected_hang = 0;
1890 }
1891
1892 static void CheckProgress()
1893 {
1894     if (smsg_send_count == last_smsg_send_count &&
1895         smsg_recv_count == last_smsg_recv_count ) 
1896     {
1897         _detected_hang = 1;
1898 #if !CMK_SMP
1899         if (_detected_hang) ProcessDeadlock();
1900 #endif
1901
1902     }
1903     else {
1904         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1905         last_smsg_send_count = smsg_send_count;
1906         last_smsg_recv_count = smsg_recv_count;
1907         _detected_hang = 0;
1908     }
1909 }
1910
1911 static void set_limit()
1912 {
1913     //if (!user_set_flag && CmiMyRank() == 0) {
1914     if (CmiMyRank() == 0) {
1915         int mynode = CmiPhysicalNodeID(CmiMyPe());
1916         int numpes = CmiNumPesOnPhysicalNode(mynode);
1917         int numprocesses = numpes / CmiMyNodeSize();
1918         MAX_REG_MEM  = _totalmem / numprocesses;
1919         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1920         if (CmiMyPe() == 0)
1921            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1922         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1923         {
1924              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1925              CmiAbort("memory registration\n");
1926         }
1927     }
1928 }
1929
1930 void LrtsPostCommonInit(int everReturn)
1931 {
1932 #if CMK_DIRECT
1933     CmiDirectInit();
1934 #endif
1935 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1936     CpvInitialize(double, projTraceStart);
1937     /* only PE 0 needs to care about registration (to generate sts file). */
1938     //if (CmiMyPe() == 0) 
1939     {
1940         registerMachineUserEventsFunction(&registerUserTraceEvents);
1941     }
1942 #endif
1943
1944 #if CMK_SMP
1945     CmiIdleState *s=CmiNotifyGetState();
1946     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1947     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1948 #else
1949     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1950     if (useDynamicSMSG)
1951     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1952 #endif
1953
1954 #if ! LARGEPAGE
1955     if (_checkProgress)
1956 #if CMK_SMP
1957     if (CmiMyRank() == 0)
1958 #endif
1959     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1960 #endif
1961  
1962 #if !LARGEPAGE
1963     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1964 #endif
1965 }
1966
1967 /* this is called by worker thread */
1968 void LrtsPostNonLocal(){
1969 #if CMK_SMP_TRACE_COMMTHREAD
1970     double startT, endT;
1971 #endif
1972 #if MULTI_THREAD_SEND
1973     if(mysize == 1) return;
1974 #if CMK_SMP_TRACE_COMMTHREAD
1975     traceEndIdle();
1976 #endif
1977
1978 #if CMK_SMP_TRACE_COMMTHREAD
1979     startT = CmiWallTimer();
1980 #endif
1981
1982 #if CMK_WORKER_SINGLE_TASK
1983     if (CmiMyRank() % 6 == 0)
1984 #endif
1985     PumpNetworkSmsg();
1986
1987 #if CMK_WORKER_SINGLE_TASK
1988     if (CmiMyRank() % 6 == 1)
1989 #endif
1990     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1991
1992 #if CMK_WORKER_SINGLE_TASK
1993     if (CmiMyRank() % 6 == 2)
1994 #endif
1995     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1996
1997 #if REMOTE_EVENT
1998 #if CMK_WORKER_SINGLE_TASK
1999     if (CmiMyRank() % 6 == 3)
2000 #endif
2001     PumpRemoteTransactions();
2002 #endif
2003
2004 #if CMK_USE_OOB
2005     if (SendBufferMsg(&smsg_oob_queue) == 1)
2006 #endif
2007     {
2008 #if CMK_WORKER_SINGLE_TASK
2009     if (CmiMyRank() % 6 == 4)
2010 #endif
2011         SendBufferMsg(&smsg_queue);
2012     }
2013
2014 #if CMK_WORKER_SINGLE_TASK
2015     if (CmiMyRank() % 6 == 5)
2016 #endif
2017     SendRdmaMsg();
2018
2019 #if CMK_SMP_TRACE_COMMTHREAD
2020     endT = CmiWallTimer();
2021     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2022 #endif
2023 #if CMK_SMP_TRACE_COMMTHREAD
2024     traceBeginIdle();
2025 #endif
2026 #endif
2027 }
2028
2029 /* useDynamicSMSG */
2030 static void    PumpDatagramConnection()
2031 {
2032     uint32_t          remote_address;
2033     uint32_t          remote_id;
2034     gni_return_t status;
2035     gni_post_state_t  post_state;
2036     uint64_t          datagram_id;
2037     int i;
2038
2039    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2040    {
2041        if (datagram_id >= mysize) {           /* bound endpoint */
2042            int pe = datagram_id - mysize;
2043            CMI_GNI_LOCK(global_gni_lock)
2044            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2045            CMI_GNI_UNLOCK(global_gni_lock)
2046            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2047            {
2048                CmiAssert(remote_id == pe);
2049                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2050                GNI_RC_CHECK("Dynamic SMSG Init", status);
2051 #if PRINT_SYH
2052                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
2053 #endif
2054                CmiAssert(smsg_connected_flag[pe] == 1);
2055                smsg_connected_flag[pe] = 2;
2056            }
2057        }
2058        else {         /* unbound ep */
2059            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2060            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2061            {
2062                CmiAssert(remote_id<mysize);
2063                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2064                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2065                GNI_RC_CHECK("Dynamic SMSG Init", status);
2066 #if PRINT_SYH
2067                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
2068 #endif
2069                smsg_connected_flag[remote_id] = 2;
2070
2071                alloc_smsg_attr(&send_smsg_attr);
2072                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2073                GNI_RC_CHECK("post unbound datagram", status);
2074            }
2075        }
2076    }
2077 }
2078
2079 /* pooling CQ to receive network message */
2080 static void PumpNetworkRdmaMsgs()
2081 {
2082     gni_cq_entry_t      event_data;
2083     gni_return_t        status;
2084
2085 }
2086
2087 inline 
2088 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2089 {
2090     RDMA_REQUEST        *rdma_request_msg;
2091     MallocRdmaRequest(rdma_request_msg);
2092     rdma_request_msg->destNode = inst_id;
2093     rdma_request_msg->pd = pd;
2094 #if REMOTE_EVENT
2095     rdma_request_msg->ack_index = ack_index;
2096 #endif
2097 #if CMK_SMP
2098     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2099 #else
2100     if(sendRdmaBuf == 0)
2101     {
2102         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2103     }else{
2104         sendRdmaTail->next = rdma_request_msg;
2105         sendRdmaTail =  rdma_request_msg;
2106     }
2107 #endif
2108
2109 }
2110
2111 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2112
2113 static void PumpNetworkSmsg()
2114 {
2115     uint64_t            inst_id;
2116     int                 ret;
2117     gni_cq_entry_t      event_data;
2118     gni_return_t        status, status2;
2119     void                *header;
2120     uint8_t             msg_tag;
2121     int                 msg_nbytes;
2122     void                *msg_data;
2123     gni_mem_handle_t    msg_mem_hndl;
2124     gni_smsg_attr_t     *smsg_attr;
2125     gni_smsg_attr_t     *remote_smsg_attr;
2126     int                 init_flag;
2127     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2128     uint64_t            source_addr;
2129     SMSG_QUEUE         *queue = &smsg_queue;
2130 #if   CMK_DIRECT
2131     cmidirectMsg        *direct_msg;
2132 #endif
2133 #if CMI_EXERT_RECV_CAP
2134     int                  recv_cnt = 0;
2135 #endif
2136     while(1)
2137     {
2138         CMI_GNI_LOCK(smsg_rx_cq_lock)
2139         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2140         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2141         if(status != GNI_RC_SUCCESS)
2142             break;
2143         inst_id = GNI_CQ_GET_INST_ID(event_data);
2144 #if REMOTE_EVENT
2145         inst_id = GET_RANK(inst_id);      /* important */
2146 #endif
2147         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2148 #if PRINT_SYH
2149         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2150 #endif
2151         if (useDynamicSMSG) {
2152             /* subtle: smsg may come before connection is setup */
2153             while (smsg_connected_flag[inst_id] != 2) 
2154                PumpDatagramConnection();
2155         }
2156         msg_tag = GNI_SMSG_ANY_TAG;
2157         while(1) {
2158             CMI_GNI_LOCK(smsg_mailbox_lock)
2159             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2160             if (status != GNI_RC_SUCCESS)
2161             {
2162                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2163                 break;
2164             }
2165 #if PRINT_SYH
2166             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2167 #endif
2168             /* copy msg out and then put into queue (small message) */
2169             switch (msg_tag) {
2170             case SMALL_DATA_TAG:
2171             {
2172                 START_EVENT();
2173                 msg_nbytes = CmiGetMsgSize(header);
2174                 msg_data    = CmiAlloc(msg_nbytes);
2175                 memcpy(msg_data, (char*)header, msg_nbytes);
2176                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2177                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2178                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2179                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2180                 handleOneRecvedMsg(msg_nbytes, msg_data);
2181                 break;
2182             }
2183             case LMSG_INIT_TAG:
2184             {
2185 #if MULTI_THREAD_SEND
2186                 MallocControlMsg(control_msg_tmp);
2187                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2188                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2189                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2190                 getLargeMsgRequest(control_msg_tmp, inst_id);
2191                 FreeControlMsg(control_msg_tmp);
2192 #else
2193                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2194                 getLargeMsgRequest(header, inst_id);
2195                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2196 #endif
2197                 break;
2198             }
2199 #if !REMOTE_EVENT && !CQWRITE
2200             case ACK_TAG:   //msg fit into mempool
2201             {
2202                 /* Get is done, release message . Now put is not used yet*/
2203                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2204                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2205                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2206 #if ! USE_LRTS_MEMPOOL
2207                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2208 #else
2209                 DecreaseMsgInSend(msg);
2210 #endif
2211                 if(NoMsgInSend(msg))
2212                     buffered_send_msg -= GetMempoolsize(msg);
2213                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2214                 CmiFree(msg);
2215                 break;
2216             }
2217 #endif
2218             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2219             {
2220 #if MULTI_THREAD_SEND
2221                 MallocControlMsg(header_tmp);
2222                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2223                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2224 #else
2225                 header_tmp = (CONTROL_MSG *) header;
2226 #endif
2227                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2228                 void *msg = (void*)(header_tmp->source_addr);
2229                 int cur_seq = CmiGetMsgSeq(msg);
2230                 int offset = ONE_SEG*(cur_seq+1);
2231                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2232                 buffered_send_msg -= header_tmp->length;
2233                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2234                 if (remain_size < 0) remain_size = 0;
2235                 CmiSetMsgSize(msg, remain_size);
2236                 if(remain_size <= 0) //transaction done
2237                 {
2238                     CmiFree(msg);
2239                 }else if (header_tmp->total_length > offset)
2240                 {
2241                     CmiSetMsgSeq(msg, cur_seq+1);
2242                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2243                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2244                     //send next seg
2245                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2246                          // pipelining
2247                     if (header_tmp->seq_id == 1) {
2248                       int i;
2249                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2250                         int seq = cur_seq+i+2;
2251                         CmiSetMsgSeq(msg, seq-1);
2252                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2253                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2254                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2255                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2256                       }
2257                     }
2258                 }
2259 #if MULTI_THREAD_SEND
2260                 FreeControlMsg(header_tmp);
2261 #else
2262                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2263 #endif
2264                 break;
2265             }
2266 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2267             case PUT_DONE_TAG:  {   //persistent message
2268                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2269                 int size = ((CONTROL_MSG *) header)->length;
2270                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2271                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2272                 CmiReference(msg);
2273                 CMI_CHECK_CHECKSUM(msg, size);
2274                 handleOneRecvedMsg(size, msg); 
2275 #if PRINT_SYH
2276                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2277 #endif
2278                 break;
2279             }
2280 #endif
2281 #if CMK_DIRECT
2282             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2283                 //create a trigger message
2284                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2285                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2286                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2287                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2288                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2289                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2290                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2291                 break;
2292 #endif
2293             default:
2294                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2295                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2296                 printf("weird tag problem\n");
2297                 CmiAbort("Unknown tag\n");
2298             }               // end switch
2299 #if PRINT_SYH
2300             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2301 #endif
2302             smsg_recv_count ++;
2303             msg_tag = GNI_SMSG_ANY_TAG;
2304 #if CMI_EXERT_RECV_CAP
2305             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2306 #endif
2307         } //endwhile GNI_SmsgGetNextWTag
2308     }   //end while GetEvent
2309     if(status == GNI_RC_ERROR_RESOURCE)
2310     {
2311         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2312         GNI_RC_CHECK("Smsg_rx_cq full", status);
2313     }
2314 }
2315
2316 static void printDesc(gni_post_descriptor_t *pd)
2317 {
2318     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2319 }
2320
2321 #if CQWRITE
2322 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2323 {
2324     gni_post_descriptor_t *pd;
2325     gni_return_t        status = GNI_RC_SUCCESS;
2326     
2327     MallocPostDesc(pd);
2328     pd->type = GNI_POST_CQWRITE;
2329     pd->cq_mode = GNI_CQMODE_SILENT;
2330     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2331     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2332     pd->cqwrite_value = data;
2333     pd->remote_mem_hndl = mem_hndl;
2334     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2335     GNI_RC_CHECK("GNI_PostCqWrite", status);
2336 }
2337 #endif
2338
2339 // register memory for a message
2340 // return mem handle
2341 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2342 {
2343     gni_return_t status = GNI_RC_SUCCESS;
2344
2345     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2346
2347 #if CMK_PERSISTENT_COMM
2348       // persistent message is always registered
2349       // BIG_MSG small pieces do not have malloc chunk header
2350     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2351         *memh = MEMHFIELD(msg);
2352         return GNI_RC_SUCCESS;
2353     }
2354 #endif
2355     if(seqno == 0 
2356 #if CMK_PERSISTENT_COMM
2357          || seqno == PERSIST_SEQ
2358 #endif
2359       )
2360     {
2361         if(IsMemHndlZero((GetMemHndl(msg))))
2362         {
2363             msg = (void*)(msg);
2364             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2365             if(status == GNI_RC_SUCCESS)
2366                 *memh = GetMemHndl(msg);
2367         }
2368         else {
2369             *memh = GetMemHndl(msg);
2370         }
2371     }
2372     else {
2373         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2374         status = registerMemory(msg, size, memh, NULL); 
2375     }
2376     return status;
2377 }
2378
2379 // for BIG_MSG called on receiver side for receiving control message
2380 // LMSG_INIT_TAG
2381 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2382 {
2383 #if     USE_LRTS_MEMPOOL
2384     CONTROL_MSG         *request_msg;
2385     gni_return_t        status = GNI_RC_SUCCESS;
2386     void                *msg_data;
2387     gni_post_descriptor_t *pd;
2388     gni_mem_handle_t    msg_mem_hndl;
2389     int                 size, transaction_size, offset = 0;
2390     size_t              register_size = 0;
2391
2392     // initial a get to transfer data from the sender side */
2393     request_msg = (CONTROL_MSG *) header;
2394     size = request_msg->total_length;
2395     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2396     MallocPostDesc(pd);
2397 #if CMK_WITH_STATS 
2398     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2399 #endif
2400     if(request_msg->seq_id < 2)   {
2401 #if CMK_SMP_TRACE_COMMTHREAD 
2402         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2403 #endif
2404         msg_data = CmiAlloc(size);
2405         CmiSetMsgSeq(msg_data, 0);
2406         _MEMCHECK(msg_data);
2407     }
2408     else {
2409         offset = ONE_SEG*(request_msg->seq_id-1);
2410         msg_data = (char*)request_msg->dest_addr + offset;
2411     }
2412    
2413     pd->cqwrite_value = request_msg->seq_id;
2414
2415     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2416     SetMemHndlZero(pd->local_mem_hndl);
2417     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2418     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2419         if(NoMsgInRecv( (void*)(msg_data)))
2420             register_size = GetMempoolsize((void*)(msg_data));
2421     }
2422
2423     pd->first_operand = ALIGN64(size);                   //  total length
2424
2425     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2426         pd->type            = GNI_POST_FMA_GET;
2427     else
2428         pd->type            = GNI_POST_RDMA_GET;
2429     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2430     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2431     pd->length          = transaction_size;
2432     pd->local_addr      = (uint64_t) msg_data;
2433     pd->remote_addr     = request_msg->source_addr + offset;
2434     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2435     pd->src_cq_hndl     = rdma_tx_cqh;
2436     pd->rdma_mode       = 0;
2437     pd->amo_cmd         = 0;
2438 #if CMI_EXERT_RDMA_CAP
2439     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2440 #endif
2441     //memory registration success
2442     if(status == GNI_RC_SUCCESS )
2443     {
2444         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2445         CMI_GNI_LOCK(lock)
2446 #if REMOTE_EVENT
2447         if( request_msg->seq_id == 0)
2448         {
2449             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2450             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2451             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2452         }
2453 #endif
2454
2455 #if CMK_WITH_STATS
2456         RDMA_TRY_SEND(pd->type)
2457 #endif
2458         if(pd->type == GNI_POST_RDMA_GET) 
2459         {
2460             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2461         }
2462         else
2463         {
2464             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2465         }
2466         CMI_GNI_UNLOCK(lock)
2467
2468         if(status == GNI_RC_SUCCESS )
2469         {
2470 #if CMI_EXERT_RDMA_CAP
2471             RDMA_pending++;
2472 #endif
2473             if(pd->cqwrite_value == 0)
2474             {
2475 #if MACHINE_DEBUG_LOG
2476                 buffered_recv_msg += register_size;
2477                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2478 #endif
2479                 IncreaseMsgInRecv(msg_data);
2480 #if CMK_SMP_TRACE_COMMTHREAD 
2481                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2482 #endif
2483             }
2484 #if  CMK_WITH_STATS
2485             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2486             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2487 #endif
2488         }
2489     }else
2490     {
2491         SetMemHndlZero((pd->local_mem_hndl));
2492     }
2493     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2494     {
2495 #if REMOTE_EVENT
2496         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2497 #else
2498         bufferRdmaMsg(inst_id, pd, -1); 
2499 #endif
2500     }else if (status != GNI_RC_SUCCESS) {
2501         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2502         GNI_RC_CHECK("GetLargeAFter posting", status);
2503     }
2504 #else
2505     CONTROL_MSG         *request_msg;
2506     gni_return_t        status;
2507     void                *msg_data;
2508     gni_post_descriptor_t *pd;
2509     RDMA_REQUEST        *rdma_request_msg;
2510     gni_mem_handle_t    msg_mem_hndl;
2511     //int source;
2512     // initial a get to transfer data from the sender side */
2513     request_msg = (CONTROL_MSG *) header;
2514     msg_data = CmiAlloc(request_msg->length);
2515     _MEMCHECK(msg_data);
2516
2517     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2518
2519     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2520     {
2521         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2522     }
2523
2524     MallocPostDesc(pd);
2525     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2526         pd->type            = GNI_POST_FMA_GET;
2527     else
2528         pd->type            = GNI_POST_RDMA_GET;
2529     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2530     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2531     pd->length          = ALIGN64(request_msg->length);
2532     pd->local_addr      = (uint64_t) msg_data;
2533     pd->remote_addr     = request_msg->source_addr;
2534     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2535     pd->src_cq_hndl     = rdma_tx_cqh;
2536     pd->rdma_mode       = 0;
2537     pd->amo_cmd         = 0;
2538
2539     //memory registration successful
2540     if(status == GNI_RC_SUCCESS)
2541     {
2542         pd->local_mem_hndl  = msg_mem_hndl;
2543        
2544         if(pd->type == GNI_POST_RDMA_GET) 
2545         {
2546             CMI_GNI_LOCK(rdma_tx_cq_lock)
2547             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2548             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2549         }
2550         else
2551         {
2552             CMI_GNI_LOCK(default_tx_cq_lock)
2553             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2554             CMI_GNI_UNLOCK(default_tx_cq_lock)
2555         }
2556
2557     }else
2558     {
2559         SetMemHndlZero(pd->local_mem_hndl);
2560     }
2561     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2562     {
2563         MallocRdmaRequest(rdma_request_msg);
2564         rdma_request_msg->next = 0;
2565         rdma_request_msg->destNode = inst_id;
2566         rdma_request_msg->pd = pd;
2567         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2568     }else {
2569         GNI_RC_CHECK("AFter posting", status);
2570     }
2571 #endif
2572 }
2573
2574 #if CQWRITE
2575 static void PumpCqWriteTransactions()
2576 {
2577
2578     gni_cq_entry_t          ev;
2579     gni_return_t            status;
2580     void                    *msg;  
2581     int                     msg_size;
2582     while(1) {
2583         //CMI_GNI_LOCK(my_cq_lock) 
2584         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2585         //CMI_GNI_UNLOCK(my_cq_lock)
2586         if(status != GNI_RC_SUCCESS) break;
2587         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2588 #if CMK_PERSISTENT_COMM
2589 #if PRINT_SYH
2590         printf(" %d CQ write event %p\n", myrank, msg);
2591 #endif
2592         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2593 #if PRINT_SYH
2594             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2595 #endif
2596             CmiReference(msg);
2597             msg_size = CmiGetMsgSize(msg);
2598             CMI_CHECK_CHECKSUM(msg, msg_size);
2599             handleOneRecvedMsg(msg_size, msg); 
2600             continue;
2601         }
2602 #endif
2603 #if ! USE_LRTS_MEMPOOL
2604        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2605 #else
2606         DecreaseMsgInSend(msg);
2607 #endif
2608         if(NoMsgInSend(msg))
2609             buffered_send_msg -= GetMempoolsize(msg);
2610         CmiFree(msg);
2611     };
2612     if(status == GNI_RC_ERROR_RESOURCE)
2613     {
2614         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2615     }
2616 }
2617 #endif
2618
2619 #if REMOTE_EVENT
2620 static void PumpRemoteTransactions()
2621 {
2622     gni_cq_entry_t          ev;
2623     gni_return_t            status;
2624     void                    *msg;   
2625     int                     inst_id, index, type, size;
2626
2627     while(1) {
2628         CMI_GNI_LOCK(global_gni_lock)
2629         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2630         CMI_GNI_UNLOCK(global_gni_lock)
2631
2632         if(status != GNI_RC_SUCCESS) break;
2633
2634         inst_id = GNI_CQ_GET_INST_ID(ev);
2635         index = GET_INDEX(inst_id);
2636         type = GET_TYPE(inst_id);
2637         switch (type) {
2638         case 0:    // ACK
2639             CmiAssert(index>=0 && index<ackPool.size);
2640             CmiAssert(GetIndexType(ackPool, index) == 1);
2641             msg = GetIndexAddress(ackPool, index);
2642 #if PRINT_SYH
2643         printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2644 #endif
2645 #if ! USE_LRTS_MEMPOOL
2646            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2647 #else
2648             DecreaseMsgInSend(msg);
2649 #endif
2650             if(NoMsgInSend(msg))
2651                 buffered_send_msg -= GetMempoolsize(msg);
2652             CmiFree(msg);
2653             IndexPool_freeslot(&ackPool, index);
2654             break;
2655 #if CMK_PERSISTENT_COMM
2656         case 1:  {    // PERSISTENT
2657             CmiLock(persistPool.lock);
2658             CmiAssert(GetIndexType(persistPool, index) == 2);
2659             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2660             CmiUnlock(persistPool.lock);
2661             START_EVENT();
2662             msg = slot->destBuf[0].destAddress;
2663             size = CmiGetMsgSize(msg);
2664             CmiReference(msg);
2665             CMI_CHECK_CHECKSUM(msg, size);
2666             TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2667             handleOneRecvedMsg(size, msg); 
2668             break;
2669             }
2670 #endif
2671         default:
2672             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2673             CmiAbort("PumpRemoteTransactions: unknown type");
2674         }
2675     }
2676     if(status == GNI_RC_ERROR_RESOURCE)
2677     {
2678         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2679     }
2680 }
2681 #endif
2682
2683 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2684 {
2685     gni_cq_entry_t          ev;
2686     gni_return_t            status;
2687     uint64_t                type, inst_id;
2688     gni_post_descriptor_t   *tmp_pd;
2689     MSG_LIST                *ptr;
2690     CONTROL_MSG             *ack_msg_tmp;
2691     ACK_MSG                 *ack_msg;
2692     uint8_t                 msg_tag;
2693 #if CMK_DIRECT
2694     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2695 #endif
2696     SMSG_QUEUE         *queue = &smsg_queue;
2697
2698     while(1) {
2699         CMI_GNI_LOCK(my_cq_lock) 
2700         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2701         CMI_GNI_UNLOCK(my_cq_lock)
2702         if(status != GNI_RC_SUCCESS) break;
2703         
2704         type = GNI_CQ_GET_TYPE(ev);
2705         if (type == GNI_CQ_EVENT_TYPE_POST)
2706         {
2707
2708 #if CMI_EXERT_RDMA_CAP
2709             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2710             RDMA_pending--;
2711 #endif
2712             inst_id     = GNI_CQ_GET_INST_ID(ev);
2713 #if PRINT_SYH
2714             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2715 #endif
2716             CMI_GNI_LOCK(my_cq_lock)
2717             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2718             CMI_GNI_UNLOCK(my_cq_lock)
2719
2720             switch (tmp_pd->type) {
2721 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2722             case GNI_POST_RDMA_PUT:
2723 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2724                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2725 #endif
2726             case GNI_POST_FMA_PUT:
2727                 if(tmp_pd->amo_cmd == 1) {
2728 #if CMK_DIRECT
2729                     //sender ACK to receiver to trigger it is done
2730                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2731                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2732                     msg_tag = DIRECT_PUT_DONE_TAG;
2733 #endif
2734                 }
2735                 else {
2736                     CmiFree((void *)tmp_pd->local_addr);
2737 #if REMOTE_EVENT
2738                     FreePostDesc(tmp_pd);
2739                     continue;
2740 #elif CQWRITE
2741                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2742                     FreePostDesc(tmp_pd);
2743                     continue;
2744 #else
2745                     MallocControlMsg(ack_msg_tmp);
2746                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2747                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2748                     ack_msg_tmp->length  = tmp_pd->length;
2749                     msg_tag = PUT_DONE_TAG;
2750 #endif
2751                 }
2752                 break;
2753 #endif
2754             case GNI_POST_RDMA_GET:
2755             case GNI_POST_FMA_GET:  {
2756 #if  ! USE_LRTS_MEMPOOL
2757                 MallocControlMsg(ack_msg_tmp);
2758                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2759                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2760                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2761                 msg_tag = ACK_TAG;  
2762 #else
2763 #if CMK_WITH_STATS
2764                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2765 #endif
2766                 int seq_id = tmp_pd->cqwrite_value;
2767                 if(seq_id > 0)      // BIG_MSG
2768                 {
2769                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2770                     MallocControlMsg(ack_msg_tmp);
2771                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2772                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2773                     ack_msg_tmp->seq_id = seq_id;
2774                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2775                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2776                     ack_msg_tmp->length = tmp_pd->length;
2777                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2778                     msg_tag = BIG_MSG_TAG; 
2779                 } 
2780                 else
2781                 {
2782                     msg_tag = ACK_TAG; 
2783 #if  !REMOTE_EVENT && !CQWRITE
2784                     MallocAckMsg(ack_msg);
2785                     ack_msg->source_addr = tmp_pd->remote_addr;
2786 #endif
2787                 }
2788 #endif
2789                 break;
2790             }
2791             case  GNI_POST_CQWRITE:
2792                    FreePostDesc(tmp_pd);
2793                    continue;
2794             default:
2795                 CmiPrintf("type=%d\n", tmp_pd->type);
2796                 CmiAbort("PumpLocalTransactions: unknown type!");
2797             }      /* end of switch */
2798
2799 #if CMK_DIRECT
2800             if (tmp_pd->amo_cmd == 1) {
2801                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2802                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2803             }
2804             else
2805 #endif
2806             if (msg_tag == ACK_TAG) {
2807 #if !REMOTE_EVENT
2808 #if   !CQWRITE
2809                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2810                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2811 #else
2812                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2813 #endif
2814 #endif
2815             }
2816             else {
2817                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2818                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2819             }
2820 #if CMK_PERSISTENT_COMM
2821             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2822 #endif
2823             {
2824                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2825 #if PRINT_SYH
2826                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2827 #endif
2828                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2829                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2830
2831                     START_EVENT();
2832                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2833                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2834 #if MACHINE_DEBUG_LOG
2835                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2836                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2837                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2838 #endif
2839                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2840                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2841                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2842                 }else if(msg_tag == BIG_MSG_TAG){
2843                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2844                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2845                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2846                         START_EVENT();
2847 #if PRINT_SYH
2848                         printf("Pipeline msg done [%d]\n", myrank);
2849 #endif
2850 #if                 CMK_SMP_TRACE_COMMTHREAD
2851                         if( tmp_pd->cqwrite_value == 1)
2852                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2853 #endif
2854                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2855                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2856                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2857                     }
2858                 }
2859             }
2860             FreePostDesc(tmp_pd);
2861         }
2862     } //end while
2863     if(status == GNI_RC_ERROR_RESOURCE)
2864     {
2865         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2866         GNI_RC_CHECK("Smsg_tx_cq full", status);
2867     }
2868 }
2869
2870 static void  SendRdmaMsg()
2871 {
2872     gni_return_t            status = GNI_RC_SUCCESS;
2873     gni_mem_handle_t        msg_mem_hndl;
2874     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2875     RDMA_REQUEST            *pre = 0;
2876     uint64_t                register_size = 0;
2877     void                    *msg;
2878     int                     i;
2879 #if CMK_SMP
2880     int len = PCQueueLength(sendRdmaBuf);
2881     for (i=0; i<len; i++)
2882     {
2883 #if CMI_EXERT_RDMA_CAP
2884          if( RDMA_pending >= RDMA_cap) break;
2885 #endif
2886         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2887         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2888         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2889         if (ptr == NULL) break;
2890 #else
2891     ptr = sendRdmaBuf;
2892     while (ptr!=0 )
2893     {
2894 #if CMI_EXERT_RDMA_CAP
2895          if( RDMA_pending >= RDMA_cap) break;
2896 #endif
2897 #endif 
2898         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2899         gni_post_descriptor_t *pd = ptr->pd;
2900         
2901         msg = (void*)(pd->local_addr);
2902         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2903         register_size = 0;
2904         if(pd->cqwrite_value == 0) {
2905             if(NoMsgInRecv(msg))
2906                 register_size = GetMempoolsize(msg);
2907         }
2908
2909         if(status == GNI_RC_SUCCESS)        //mem register good
2910         {
2911             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2912             CMI_GNI_LOCK(lock);
2913 #if REMOTE_EVENT
2914             if( pd->cqwrite_value == 0) {
2915                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2916                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2917                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2918             }
2919 #if CMK_PERSISTENT_COMM
2920             else if (pd->cqwrite_value == PERSIST_SEQ) {
2921                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2922                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
2923                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2924             }
2925 #endif
2926 #endif
2927 #if CMK_WITH_STATS
2928             RDMA_TRY_SEND(pd->type)
2929 #endif
2930             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2931             {
2932                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2933             }
2934             else
2935             {
2936                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2937             }
2938             CMI_GNI_UNLOCK(lock);
2939             
2940             if(status == GNI_RC_SUCCESS)    //post good
2941             {
2942 #if CMI_EXERT_RDMA_CAP
2943                 RDMA_pending ++;
2944 #endif
2945 #if !CMK_SMP
2946                 tmp_ptr = ptr;
2947                 if(pre != 0) {
2948                     pre->next = ptr->next;
2949                 }
2950                 else {
2951                     sendRdmaBuf = ptr->next;
2952                 }
2953                 ptr = ptr->next;
2954                 FreeRdmaRequest(tmp_ptr);
2955 #endif
2956                 if(pd->cqwrite_value == 0)
2957                 {
2958 #if CMK_SMP_TRACE_COMMTHREAD 
2959                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2960 #endif
2961                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2962                 }
2963 #if  CMK_WITH_STATS
2964                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2965                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2966 #endif
2967 #if MACHINE_DEBUG_LOG
2968                 buffered_recv_msg += register_size;
2969                 MACHSTATE(8, "GO request from buffered\n"); 
2970 #endif
2971 #if PRINT_SYH
2972                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2973 #endif
2974             }else           // cannot post
2975             {
2976 #if CMK_SMP
2977                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2978 #else
2979                 pre = ptr;
2980                 ptr = ptr->next;
2981 #endif
2982 #if PRINT_SYH
2983                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2984 #endif
2985                 break;
2986             }
2987         } else          //memory registration fails
2988         {
2989 #if CMK_SMP
2990             PCQueuePush(sendRdmaBuf, (char*)ptr);
2991 #else
2992             pre = ptr;
2993             ptr = ptr->next;
2994 #endif
2995         }
2996     } //end while
2997 #if ! CMK_SMP
2998     if(ptr == 0)
2999         sendRdmaTail = pre;
3000 #endif
3001 }
3002
3003 // return 1 if all messages are sent
3004 static int SendBufferMsg(SMSG_QUEUE *queue)
3005 {
3006     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3007     CONTROL_MSG         *control_msg_tmp;
3008     gni_return_t        status;
3009     int                 done = 1;
3010     uint64_t            register_size;
3011     void                *register_addr;
3012     int                 index_previous = -1;
3013 #if CMI_EXERT_SEND_CAP
3014     int                 sent_cnt = 0;
3015 #endif
3016
3017 #if CMK_SMP
3018     int          index = 0;
3019 #if ONE_SEND_QUEUE
3020     memset(destpe_avail, 0, mysize * sizeof(char));
3021     for (index=0; index<1; index++)
3022     {
3023         int i, len = PCQueueLength(queue->sendMsgBuf);
3024         for (i=0; i<len; i++) 
3025         {
3026             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3027             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3028             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3029             if(ptr == NULL) break;
3030             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3031                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3032                 continue;
3033             }
3034 #else
3035 #if SMP_LOCKS
3036     int nonempty = PCQueueLength(nonEmptyQueues);
3037     for(index =0; index<nonempty; index++)
3038     {
3039         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3040         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3041         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3042         if(current_list == NULL) break; 
3043         PCQueue current_queue= current_list-> sendSmsgBuf;
3044         CmiLock(current_list->lock);
3045         int i, len = PCQueueLength(current_queue);
3046         current_list->pushed = 0;
3047         CmiUnlock(current_list->lock);
3048 #else
3049     for(index =0; index<mysize; index++)
3050     {
3051         //if (index == myrank) continue;
3052         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3053         int i, len = PCQueueLength(current_queue);
3054 #endif
3055         for (i=0; i<len; i++) 
3056         {
3057             CMI_PCQUEUEPOP_LOCK(current_queue)
3058             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3059             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3060             if (ptr == 0) break;
3061 #endif
3062 #else
3063     int index = queue->smsg_head_index;
3064     while(index != -1)
3065     {
3066         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3067         pre = 0;
3068         while(ptr != 0)
3069         {
3070 #endif
3071             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3072             status = GNI_RC_ERROR_RESOURCE;
3073             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3074                 /* connection not exists yet */
3075             }
3076             else
3077             switch(ptr->tag)
3078             {
3079             case SMALL_DATA_TAG:
3080                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3081                 if(status == GNI_RC_SUCCESS)
3082                 {
3083                     CmiFree(ptr->msg);
3084                 }
3085                 break;
3086             case LMSG_INIT_TAG:
3087                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3088                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3089                 break;
3090 #if !REMOTE_EVENT && !CQWRITE
3091             case ACK_TAG:
3092                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3093                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3094                 break;
3095 #endif
3096             case BIG_MSG_TAG:
3097                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3098                 if(status == GNI_RC_SUCCESS)
3099                 {
3100                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3101                 }
3102                 break;
3103 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3104             case PUT_DONE_TAG:
3105                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3106                 if(status == GNI_RC_SUCCESS)
3107                 {
3108                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3109                 }
3110                 break;
3111 #endif
3112 #if CMK_DIRECT
3113             case DIRECT_PUT_DONE_TAG:
3114                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3115                 if(status == GNI_RC_SUCCESS)
3116                 {
3117                     free((CMK_DIRECT_HEADER*)ptr->msg);
3118                 }
3119                 break;
3120
3121 #endif
3122             default:
3123                 printf("Weird tag\n");
3124                 CmiAbort("should not happen\n");
3125             }       // end switch
3126             if(status == GNI_RC_SUCCESS)
3127             {
3128 #if PRINT_SYH
3129                 buffered_smsg_counter--;
3130                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3131 #endif
3132 #if !CMK_SMP
3133                 tmp_ptr = ptr;
3134                 if(pre)
3135                 {
3136                     ptr = pre ->next = ptr->next;
3137                 }else
3138                 {
3139                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3140                 }
3141                 FreeMsgList(tmp_ptr);
3142 #else
3143                 FreeMsgList(ptr);
3144 #endif
3145 #if CMI_EXERT_SEND_CAP
3146                 if(++sent_cnt == SEND_CAP) break;
3147 #endif
3148             }else {
3149 #if CMK_SMP
3150 #if ONE_SEND_QUEUE
3151                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3152 #else
3153                 PCQueuePush(current_queue, (char*)ptr);
3154 #endif
3155 #else
3156                 pre = ptr;
3157                 ptr=ptr->next;
3158 #endif
3159                 done = 0;
3160                 if(status == GNI_RC_ERROR_RESOURCE)
3161                 {
3162 #if CMK_SMP && ONE_SEND_QUEUE 
3163                     destpe_avail[ptr->destNode] = 1;
3164 #else
3165                     break;
3166 #endif
3167                 }
3168             } 
3169         } //end while
3170 #if !CMK_SMP
3171         if(ptr == 0)
3172             queue->smsg_msglist_index[index].tail = pre;
3173         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3174         {
3175             if(index_previous != -1)
3176                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3177             else
3178                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3179         }
3180         else {
3181             index_previous = index;
3182         }
3183         index = queue->smsg_msglist_index[index].next;
3184 #else
3185 #if !ONE_SEND_QUEUE && SMP_LOCKS
3186         CmiLock(current_list->lock);
3187         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3188         {
3189             current_list->pushed = 1;
3190             PCQueuePush(nonEmptyQueues, current_list);
3191         }
3192         CmiUnlock(current_list->lock); 
3193 #endif
3194 #endif
3195
3196 #if CMI_EXERT_SEND_CAP
3197         if(sent_cnt == SEND_CAP) {
3198             done = 0;
3199             break;
3200         }
3201 #endif
3202     }   // end pooling for all cores
3203     return done;
3204 }
3205
3206 static void ProcessDeadlock();
3207 void LrtsAdvanceCommunication(int whileidle)
3208 {
3209     static int count = 0;
3210     /*  Receive Msg first */
3211 #if CMK_SMP_TRACE_COMMTHREAD
3212     double startT, endT;
3213 #endif
3214     if (useDynamicSMSG && whileidle)
3215     {
3216 #if CMK_SMP_TRACE_COMMTHREAD
3217         startT = CmiWallTimer();
3218 #endif
3219         PumpDatagramConnection();
3220 #if CMK_SMP_TRACE_COMMTHREAD
3221         endT = CmiWallTimer();
3222         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3223 #endif
3224     }
3225
3226 #if CMK_SMP_TRACE_COMMTHREAD
3227     startT = CmiWallTimer();
3228 #endif
3229     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3230     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3231 #if CMK_SMP_TRACE_COMMTHREAD
3232     endT = CmiWallTimer();
3233     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3234 #endif
3235
3236 #if CMK_SMP_TRACE_COMMTHREAD
3237     startT = CmiWallTimer();
3238 #endif
3239     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3240     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3241 #if CMK_SMP_TRACE_COMMTHREAD
3242     endT = CmiWallTimer();
3243     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3244 #endif
3245
3246 #if CMK_SMP_TRACE_COMMTHREAD
3247     startT = CmiWallTimer();
3248 #endif
3249     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3250
3251 #if CQWRITE
3252     PumpCqWriteTransactions();
3253 #endif
3254
3255 #if REMOTE_EVENT
3256     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3257 #endif
3258
3259     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3260 #if CMK_SMP_TRACE_COMMTHREAD
3261     endT = CmiWallTimer();
3262     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3263 #endif
3264  
3265 #if CMK_SMP_TRACE_COMMTHREAD
3266     startT = CmiWallTimer();
3267 #endif
3268     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3269     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3270 #if CMK_SMP_TRACE_COMMTHREAD
3271     endT = CmiWallTimer();
3272     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3273 #endif
3274
3275     /* Send buffered Message */
3276 #if CMK_SMP_TRACE_COMMTHREAD
3277     startT = CmiWallTimer();
3278 #endif
3279 #if CMK_USE_OOB
3280     if (SendBufferMsg(&smsg_oob_queue) == 1)
3281 #endif
3282     {
3283         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3284     }
3285     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3286 #if CMK_SMP_TRACE_COMMTHREAD
3287     endT = CmiWallTimer();
3288     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3289 #endif
3290
3291 #if CMK_SMP && ! LARGEPAGE
3292     if (_detected_hang)  ProcessDeadlock();
3293 #endif
3294 }
3295
3296 /* useDynamicSMSG */
3297 static void _init_dynamic_smsg()
3298 {
3299     gni_return_t status;
3300     uint32_t     vmdh_index = -1;
3301     int i;
3302
3303     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3304     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3305     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3306     for(i=0; i<mysize; i++) {
3307         smsg_connected_flag[i] = 0;
3308         smsg_attr_vector_local[i] = NULL;
3309         smsg_attr_vector_remote[i] = NULL;
3310     }
3311     if(mysize <=512)
3312     {
3313         SMSG_MAX_MSG = 4096;
3314     }else if (mysize <= 4096)
3315     {
3316         SMSG_MAX_MSG = 4096/mysize * 1024;
3317     }else if (mysize <= 16384)
3318     {
3319         SMSG_MAX_MSG = 512;
3320     }else {
3321         SMSG_MAX_MSG = 256;
3322     }
3323
3324     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3325     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3326     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3327     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3328     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3329
3330     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3331     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3332     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3333     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3334     mailbox_list->offset = 0;
3335     mailbox_list->next = 0;
3336     
3337     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3338         mailbox_list->size, smsg_rx_cqh,
3339         GNI_MEM_READWRITE,   
3340         vmdh_index,
3341         &(mailbox_list->mem_hndl));
3342     GNI_RC_CHECK("MEMORY registration for smsg", status);
3343
3344     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3345     GNI_RC_CHECK("Unbound EP", status);
3346     
3347     alloc_smsg_attr(&send_smsg_attr);
3348
3349     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3350     GNI_RC_CHECK("post unbound datagram", status);
3351
3352       /* always pre-connect to proc 0 */
3353     //if (myrank != 0) connect_to(0);
3354 }
3355
3356 static void _init_static_smsg()
3357 {
3358     gni_smsg_attr_t      *smsg_attr;
3359     gni_smsg_attr_t      remote_smsg_attr;
3360     gni_smsg_attr_t      *smsg_attr_vec;
3361     gni_mem_handle_t     my_smsg_mdh_mailbox;
3362     int      ret, i;
3363     gni_return_t status;
3364     uint32_t              vmdh_index = -1;
3365     mdh_addr_t            base_infor;
3366     mdh_addr_t            *base_addr_vec;
3367     char *env;
3368
3369     if(mysize <=512)
3370     {
3371         SMSG_MAX_MSG = 1024;
3372     }else if (mysize <= 4096)
3373     {
3374         SMSG_MAX_MSG = 1024;
3375     }else if (mysize <= 16384)
3376     {
3377         SMSG_MAX_MSG = 512;
3378     }else {
3379         SMSG_MAX_MSG = 256;
3380     }
3381     
3382     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3383     if (env) SMSG_MAX_MSG = atoi(env);
3384     CmiAssert(SMSG_MAX_MSG > 0);
3385
3386     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3387     
3388     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3389     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3390     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3391     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3392     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3393     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3394     CmiAssert(ret == 0);
3395     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3396     
3397     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3398             smsg_memlen*(mysize), smsg_rx_cqh,
3399             GNI_MEM_READWRITE,   
3400             vmdh_index,
3401             &my_smsg_mdh_mailbox);
3402     register_memory_size += smsg_memlen*(mysize);
3403     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3404
3405     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3406     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3407
3408     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3409     base_infor.mdh =  my_smsg_mdh_mailbox;
3410     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3411
3412     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3413  
3414     for(i=0; i<mysize; i++)
3415     {
3416         if(i==myrank)
3417             continue;
3418         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3419         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3420         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3421         smsg_attr[i].mbox_offset = i*smsg_memlen;
3422         smsg_attr[i].buff_size = smsg_memlen;
3423         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3424         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3425     }
3426
3427     for(i=0; i<mysize; i++)
3428     {
3429         if (myrank == i) continue;
3430
3431         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3432         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3433         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3434         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3435         remote_smsg_attr.buff_size = smsg_memlen;
3436         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3437         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3438
3439         /* initialize the smsg channel */
3440         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3441         GNI_RC_CHECK("SMSG Init", status);
3442     } //end initialization
3443
3444     free(base_addr_vec);
3445     free(smsg_attr);
3446
3447     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3448     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3449
3450
3451 inline
3452 static void _init_send_queue(SMSG_QUEUE *queue)
3453 {
3454      int i;
3455 #if ONE_SEND_QUEUE
3456      queue->sendMsgBuf = PCQueueCreate();
3457      destpe_avail = (char*)malloc(mysize * sizeof(char));
3458 #else
3459      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3460 #if CMK_SMP && SMP_LOCKS
3461      nonEmptyQueues = PCQueueCreate();
3462 #endif
3463      for(i =0; i<mysize; i++)
3464      {
3465 #if CMK_SMP
3466          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3467 #if SMP_LOCKS
3468          queue->smsg_msglist_index[i].pushed = 0;
3469          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3470 #endif
3471 #else
3472          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3473          queue->smsg_msglist_index[i].next = -1;
3474          queue->smsg_head_index = -1;
3475 #endif
3476         
3477      }
3478 #endif
3479 }
3480
3481 inline
3482 static void _init_smsg()
3483 {
3484     if(mysize > 1) {
3485         if (useDynamicSMSG)
3486             _init_dynamic_smsg();
3487         else
3488             _init_static_smsg();
3489     }
3490
3491     _init_send_queue(&smsg_queue);
3492 #if CMK_USE_OOB
3493     _init_send_queue(&smsg_oob_queue);
3494 #endif
3495 }
3496
3497 static void _init_static_msgq()
3498 {
3499     gni_return_t status;
3500     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3501     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3502     msgq_attrs.smsg_q_sz = 1;
3503     msgq_attrs.rcv_pool_sz = 1;
3504     msgq_attrs.num_msgq_eps = 2;
3505     msgq_attrs.nloc_insts = 8;
3506     msgq_attrs.modes = 0;
3507     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3508
3509     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3510     GNI_RC_CHECK("MSGQ Init", status);
3511
3512
3513 }
3514
3515
3516 static CmiUInt8 total_mempool_size = 0;
3517 static CmiUInt8 total_mempool_calls = 0;
3518
3519 #if USE_LRTS_MEMPOOL
3520 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3521 {
3522     void *pool;
3523     int ret;
3524     gni_return_t status = GNI_RC_SUCCESS;
3525
3526     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3527     if (*size < default_size) *size = default_size;
3528 #if LARGEPAGE
3529     // round up to be multiple of _tlbpagesize
3530     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3531     *size = ALIGNHUGEPAGE(*size);
3532 #endif
3533     total_mempool_size += *size;
3534     total_mempool_calls += 1;
3535 #if   !LARGEPAGE
3536     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3537     {
3538         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3539         CmiAbort("alloc_mempool_block");
3540     }
3541 #endif
3542 #if LARGEPAGE
3543     pool = my_get_huge_pages(*size);
3544     ret = pool==NULL;
3545 #else
3546     ret = posix_memalign(&pool, ALIGNBUF, *size);
3547 #endif
3548     if (ret != 0) {
3549 #if CMK_SMP && STEAL_MEMPOOL
3550       pool = steal_mempool_block(size, mem_hndl);
3551       if (pool != NULL) return pool;
3552 #endif
3553       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3554       if (ret == ENOMEM)
3555         CmiAbort("alloc_mempool_block: out of memory.");
3556       else
3557         CmiAbort("alloc_mempool_block: posix_memalign failed");
3558     }
3559 #if LARGEPAGE
3560     CmiMemLock();
3561     register_count++;
3562     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3563     CmiMemUnlock();
3564     if(status != GNI_RC_SUCCESS) {
3565         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3566 sweep_mempool(CpvAccess(mempool));
3567     }
3568     GNI_RC_CHECK("MEMORY_REGISTER", status);
3569 #else
3570     SetMemHndlZero((*mem_hndl));
3571 #endif
3572     return pool;
3573 }
3574
3575 // ptr is a block head pointer
3576 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3577 {
3578     if(!(IsMemHndlZero(mem_hndl)))
3579     {
3580         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3581     }
3582 #if LARGEPAGE
3583     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3584 #else
3585     free(ptr);
3586 #endif
3587 }
3588 #endif
3589
3590 void LrtsPreCommonInit(int everReturn){
3591 #if USE_LRTS_MEMPOOL
3592     CpvInitialize(mempool_type*, mempool);
3593     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3594     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3595 #endif
3596 }
3597
3598 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3599 {
3600     register int            i;
3601     int                     rc;
3602     int                     device_id = 0;
3603     unsigned int            remote_addr;
3604     gni_cdm_handle_t        cdm_hndl;
3605     gni_return_t            status = GNI_RC_SUCCESS;
3606     uint32_t                vmdh_index = -1;
3607     uint8_t                 ptag;
3608     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3609     int                     first_spawned;
3610     int                     physicalID;
3611     char                   *env;
3612
3613     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3614     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3615     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3616    
3617     status = PMI_Init(&first_spawned);
3618     GNI_RC_CHECK("PMI_Init", status);
3619
3620     status = PMI_Get_size(&mysize);
3621     GNI_RC_CHECK("PMI_Getsize", status);
3622
3623     status = PMI_Get_rank(&myrank);
3624     GNI_RC_CHECK("PMI_getrank", status);
3625
3626     //physicalID = CmiPhysicalNodeID(myrank);
3627     
3628     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3629
3630     *myNodeID = myrank;
3631     *numNodes = mysize;
3632   
3633 #if MULTI_THREAD_SEND
3634     /* Currently, we only consider the case that comm. thread will only recv msgs */
3635     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3636 #endif
3637
3638     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3639     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3640     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3641
3642     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3643     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3644     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3645
3646     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3647     if (env) useDynamicSMSG = 1;
3648     if (!useDynamicSMSG)
3649       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3650     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3651     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3652     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3653     
3654     if(myrank == 0)
3655     {
3656         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3657         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3658     }
3659 #ifdef USE_ONESIDED
3660     onesided_init(NULL, &onesided_hnd);
3661
3662     // this is a GNI test, so use the libonesided bypass functionality
3663     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3664     local_addr = gniGetNicAddress();
3665 #else
3666     ptag = get_ptag();
3667     cookie = get_cookie();
3668 #if 0
3669     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3670 #endif
3671     //Create and attach to the communication  domain */
3672     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3673     GNI_RC_CHECK("GNI_CdmCreate", status);
3674     //* device id The device id is the minor number for the device
3675     //that is assigned to the device by the system when the device is created.
3676     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3677     //where X is the device number 0 default 
3678     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3679     GNI_RC_CHECK("GNI_CdmAttach", status);
3680     local_addr = get_gni_nic_address(0);
3681 #endif
3682     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3683     _MEMCHECK(MPID_UGNI_AllAddr);
3684     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3685     /* create the local completion queue */
3686     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3687     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3688     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3689     
3690     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3691     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3692     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3693
3694     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3695     GNI_RC_CHECK("Create CQ (rx)", status);
3696     
3697     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3698     GNI_RC_CHECK("Create Post CQ (rx)", status);
3699     
3700     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3701     //GNI_RC_CHECK("Create BTE CQ", status);
3702
3703     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3704     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3705     _MEMCHECK(ep_hndl_array);
3706 #if MULTI_THREAD_SEND 
3707     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3708     //default_tx_cq_lock = CmiCreateLock();
3709     rdma_tx_cq_lock = CmiCreateLock();
3710     smsg_rx_cq_lock = CmiCreateLock();
3711     //global_gni_lock  = CmiCreateLock();
3712     //rx_cq_lock  = CmiCreateLock();
3713 #endif
3714     for (i=0; i<mysize; i++) {
3715         if(i == myrank) continue;
3716         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3717         GNI_RC_CHECK("GNI_EpCreate ", status);   
3718         remote_addr = MPID_UGNI_AllAddr[i];
3719         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3720         GNI_RC_CHECK("GNI_EpBind ", status);   
3721     }
3722
3723     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3724     _init_smsg();
3725     PMI_Barrier();
3726
3727 #if     USE_LRTS_MEMPOOL
3728     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3729     if (env) {
3730         _totalmem = CmiReadSize(env);
3731         if (myrank == 0)
3732             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3733     }
3734
3735     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3736     if (env) _mempool_size = CmiReadSize(env);
3737     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3738         _mempool_size = CmiReadSize(env);
3739
3740
3741     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3742     if (env) {
3743         MAX_REG_MEM = CmiReadSize(env);
3744         user_set_flag = 1;
3745     }
3746     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3747         MAX_REG_MEM = CmiReadSize(env);
3748         user_set_flag = 1;
3749     }
3750
3751     env = getenv("CHARM_UGNI_SEND_MAX");
3752     if (env) {
3753         MAX_BUFF_SEND = CmiReadSize(env);
3754         user_set_flag = 1;
3755     }
3756     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3757         MAX_BUFF_SEND = CmiReadSize(env);
3758         user_set_flag = 1;
3759     }
3760
3761     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3762     if (env) {
3763         _mempool_size_limit = CmiReadSize(env);
3764     }
3765
3766     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3767     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3768
3769     if (myrank==0) {
3770         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3771         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3772         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3773             /* memblock can expand to BIG_MSG * 2 size */
3774             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3775             CmiAbort("mempool maximum size is too small. \n");
3776         }
3777 #if MULTI_THREAD_SEND
3778         printf("Charm++> worker thread sending messages\n");
3779 #elif COMM_THREAD_SEND
3780         printf("Charm++> only comm thread send/recv messages\n");
3781 #endif
3782     }
3783
3784 #endif     /* end of USE_LRTS_MEMPOOL */
3785
3786     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3787     if (env) {
3788         BIG_MSG = CmiReadSize(env);
3789         if (BIG_MSG < ONE_SEG)
3790           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3791     }
3792     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3793     if (env) {
3794         BIG_MSG_PIPELINE = atoi(env);
3795     }
3796
3797     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3798     if (env) _checkProgress = 0;
3799     if (mysize == 1) _checkProgress = 0;
3800
3801 #if CMI_EXERT_RDMA_CAP
3802     env = getenv("CHARM_UGNI_RDMA_MAX");
3803     if (env)  {
3804         RDMA_pending = atoi(env);
3805         if (myrank == 0)
3806             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3807     }
3808 #endif
3809     
3810     /*
3811     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3812     if (env) 
3813         _tlbpagesize = CmiReadSize(env);
3814     */
3815     /* real gethugepagesize() is only available when hugetlb module linked */
3816     _tlbpagesize = gethugepagesize();
3817     if (myrank == 0) {
3818         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3819     }
3820
3821 #if LARGEPAGE
3822     if (_tlbpagesize == 4096) {
3823         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3824     }
3825 #endif
3826
3827     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3828     
3829     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3830
3831     /* init DMA buffer for medium message */
3832
3833     //_init_DMA_buffer();
3834     
3835     free(MPID_UGNI_AllAddr);
3836 #if CMK_SMP
3837     sendRdmaBuf = PCQueueCreate();
3838 #else
3839     sendRdmaBuf = 0;
3840 #endif
3841 #if MACHINE_DEBUG_LOG
3842     char ln[200];
3843     sprintf(ln,"debugLog.%d",myrank);
3844     debugLog=fopen(ln,"w");
3845 #endif
3846
3847 //    NTK_Init();
3848 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3849
3850 #if  REMOTE_EVENT
3851     IndexPool_init(&ackPool);
3852 #if CMK_PERSISTENT_COMM
3853     IndexPool_init(&persistPool);
3854 #endif
3855     SHIFT = 1;
3856     while (1<<SHIFT < mysize) SHIFT++;
3857     CmiAssert(SHIFT < 31);
3858 #endif
3859
3860 #if CMK_WITH_STATS
3861     init_comm_stats();
3862 #endif
3863 }
3864
3865 void* LrtsAlloc(int n_bytes, int header)
3866 {
3867     void *ptr = NULL;
3868 #if 0
3869     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3870 #endif
3871     if(n_bytes <= SMSG_MAX_MSG)
3872     {
3873         int totalsize = n_bytes+header;
3874         ptr = malloc(totalsize);
3875     }
3876     else {
3877         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3878 #if     USE_LRTS_MEMPOOL
3879         n_bytes = ALIGN64(n_bytes);
3880         if(n_bytes < BIG_MSG)
3881         {
3882             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3883             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3884         }else 
3885         {
3886 #if LARGEPAGE
3887             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3888             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3889             char *res = my_get_huge_pages(n_bytes);
3890 #else
3891             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3892 #endif
3893             if (res) ptr = res + ALIGNBUF - header;
3894         }
3895 #else
3896         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3897         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3898         ptr = res + ALIGNBUF - header;
3899 #endif
3900     }
3901 #if CMK_PERSISTENT_COMM
3902     if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3903 #endif
3904     return ptr;
3905 }
3906
3907 void  LrtsFree(void *msg)
3908 {
3909     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3910 #if CMK_PERSISTENT_COMM
3911     if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3912 #endif
3913     if (size <= SMSG_MAX_MSG)
3914         free(msg);
3915     else {
3916         size = ALIGN64(size);
3917         if(size>=BIG_MSG)
3918         {
3919 #if LARGEPAGE
3920             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3921             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3922 #else
3923             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3924 #endif
3925         }
3926         else {
3927 #if    USE_LRTS_MEMPOOL
3928 #if CMK_SMP
3929             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3930 #else
3931             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3932 #endif
3933 #else
3934             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3935 #endif
3936         }
3937     }
3938 }
3939
3940 void LrtsExit()
3941 {
3942 #if CMK_WITH_STATS
3943 #if CMK_SMP
3944     if(CmiMyRank() == CmiMyNodeSize())
3945 #endif
3946     if (print_stats) print_comm_stats();
3947 #endif
3948     /* free memory ? */
3949 #if USE_LRTS_MEMPOOL
3950     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3951     mempool_destroy(CpvAccess(mempool));
3952 #endif
3953     PMI_Finalize();
3954     exit(0);
3955 }
3956
3957 void LrtsDrainResources()
3958 {
3959     if(mysize == 1) return;
3960     while (
3961 #if CMK_USE_OOB
3962            !SendBufferMsg(&smsg_oob_queue) ||
3963 #endif
3964            !SendBufferMsg(&smsg_queue) 
3965           )
3966     {
3967         if (useDynamicSMSG)
3968             PumpDatagramConnection();
3969         PumpNetworkSmsg();
3970         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3971         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3972 #if REMOTE_EVENT
3973         PumpRemoteTransactions();
3974 #endif
3975         SendRdmaMsg();
3976     }
3977     PMI_Barrier();
3978 }
3979
3980 void LrtsAbort(const char *message) {
3981     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
3982     CmiPrintStackTrace(0);
3983     PMI_Abort(-1, message);
3984 }
3985
3986 /**************************  TIMER FUNCTIONS **************************/
3987 #if CMK_TIMER_USE_SPECIAL
3988 /* MPI calls are not threadsafe, even the timer on some machines */
3989 static CmiNodeLock  timerLock = 0;
3990 static int _absoluteTime = 0;
3991 static int _is_global = 0;
3992 static struct timespec start_ts;
3993
3994 inline int CmiTimerIsSynchronized() {
3995     return 0;
3996 }
3997
3998 inline int CmiTimerAbsolute() {
3999     return _absoluteTime;
4000 }
4001
4002 double CmiStartTimer() {
4003     return 0.0;
4004 }
4005
4006 double CmiInitTime() {
4007     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4008 }
4009
4010 void CmiTimerInit(char **argv) {
4011     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4012     if (_absoluteTime && CmiMyPe() == 0)
4013         printf("Charm++> absolute  timer is used\n");
4014     
4015     _is_global = CmiTimerIsSynchronized();
4016
4017
4018     if (_is_global) {
4019         if (CmiMyRank() == 0) {
4020             clock_gettime(CLOCK_MONOTONIC, &start_ts);
4021         }
4022     } else { /* we don't have a synchronous timer, set our own start time */
4023         CmiBarrier();
4024         CmiBarrier();
4025         CmiBarrier();
4026         clock_gettime(CLOCK_MONOTONIC, &start_ts);
4027     }
4028     CmiNodeAllBarrier();          /* for smp */
4029 }
4030
4031 /**
4032  * Since the timerLock is never created, and is
4033  * always NULL, then all the if-condition inside
4034  * the timer functions could be disabled right
4035  * now in the case of SMP.
4036  */
4037 double CmiTimer(void) {
4038     struct timespec now_ts;
4039     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4040     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4041         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
4042 }
4043
4044 double CmiWallTimer(void) {
4045     struct timespec now_ts;
4046     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4047     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4048         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
4049 }
4050
4051 double CmiCpuTimer(void) {
4052     struct timespec now_ts;
4053     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4054     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4055         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
4056 }
4057
4058 #endif
4059 /************Barrier Related Functions****************/
4060
4061 int CmiBarrier()
4062 {
4063     gni_return_t status;
4064
4065 #if CMK_SMP
4066     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
4067     CmiNodeAllBarrier();
4068     if (CmiMyRank() == CmiMyNodeSize())
4069 #else
4070     if (CmiMyRank() == 0)
4071 #endif
4072     {
4073         /**
4074          *  The call of CmiBarrier is usually before the initialization
4075          *  of trace module of Charm++, therefore, the START_EVENT
4076          *  and END_EVENT are disabled here. -Chao Mei
4077          */
4078         /*START_EVENT();*/
4079         status = PMI_Barrier();
4080         GNI_RC_CHECK("PMI_Barrier", status);
4081         /*END_EVENT(10);*/
4082     }
4083     CmiNodeAllBarrier();
4084     return 0;
4085
4086 }
4087 #if CMK_DIRECT
4088 #include "machine-cmidirect.c"
4089 #endif