1b218a070a56684ef31b5c48f597b2c6474e0019
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               0
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73
74 #define USE_LRTS_MEMPOOL                  1
75
76 #define PRINT_SYH                         0
77
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD     0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
88
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
97
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
101 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define  START_EVENT()
104 #define  END_EVENT(x)
105 #endif
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
111
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem =  4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
115
116 static CmiInt8 _totalmem = 0.8*oneGB;
117
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG  =  16*oneMB;
120 static CmiInt8 ONE_SEG  =  4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG  =  4*oneMB;
123 static CmiInt8 ONE_SEG  =  2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
130
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
134
135 #if LARGEPAGE
136 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
137 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
138 static CmiInt8  register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND 
141 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
143 #else
144 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
146 #endif
147
148
149 #endif
150
151 #endif     /* end USE_LRTS_MEMPOOL */
152
153 #if MULTI_THREAD_SEND 
154 #define     CMI_GNI_LOCK(x)       CmiLock(x);
155 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
156 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
157 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
158 #else
159 #define     CMI_GNI_LOCK(x)
160 #define     CMI_GNI_UNLOCK(x)
161 #define     CMI_PCQUEUEPOP_LOCK(Q)   
162 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
164
165 static int _tlbpagesize = 4096;
166
167 //static int _smpd_count  = 0;
168
169 static int   user_set_flag  = 0;
170
171 static int _checkProgress = 1;             /* check deadlock */
172 static int _detected_hang = 0;
173
174 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
175
176 // dynamic SMSG
177 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
178
179 static int avg_smsg_connection = 32;
180 static int                 *smsg_connected_flag= 0;
181 static gni_smsg_attr_t     **smsg_attr_vector_local;
182 static gni_smsg_attr_t     **smsg_attr_vector_remote;
183 static gni_ep_handle_t     ep_hndl_unbound;
184 static gni_smsg_attr_t     send_smsg_attr;
185 static gni_smsg_attr_t     recv_smsg_attr;
186
187 typedef struct _dynamic_smsg_mailbox{
188    void     *mailbox_base;
189    int      size;
190    int      offset;
191    gni_mem_handle_t  mem_hndl;
192    struct      _dynamic_smsg_mailbox  *next;
193 }dynamic_smsg_mailbox_t;
194
195 static dynamic_smsg_mailbox_t  *mailbox_list;
196
197 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
198 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
199
200 #if PRINT_SYH
201 int         lrts_send_msg_id = 0;
202 int         lrts_local_done_msg = 0;
203 int         lrts_send_rdma_success = 0;
204 #endif
205
206 #include "machine.h"
207
208 #include "pcqueue.h"
209
210 #include "mempool.h"
211
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
215
216 //#define  USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t   onesided_hnd;
221 onesided_md_t    omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
223
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225
226 #else
227 uint8_t   onesided_hnd, omdh;
228
229 #if REMOTE_EVENT || CQWRITE 
230 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231     if(register_memory_size+size>= MAX_REG_MEM) { \
232         status = GNI_RC_ERROR_NOMEM;} \
233     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
234         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
235 #else
236 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237         if (register_memory_size + size >= MAX_REG_MEM) { \
238             status = GNI_RC_ERROR_NOMEM; \
239         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
240             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
241 #endif
242
243 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
244     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245              register_memory_size -= size; \
246          else CmiAbort("MEM_DEregister");  \
247     } while (0)
248 #endif
249
250 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
252 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
253 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
254 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
261 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
262 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
264
265 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
267
268 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
272
273 #define ALIGNBUF                64
274
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
277
278 #define FMA_PER_CORE  1024
279 #define FMA_BUFFER_SIZE 1024
280
281 /* If SMSG is used */
282 static int  SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT 72 
284
285 #define MSGQ_MAXSIZE       2048
286 /* large message transfer with FMA or BTE */
287 #define LRTS_GNI_RDMA_THRESHOLD  1024 
288
289 #if CMK_SMP
290 static int  REMOTE_QUEUE_ENTRIES=163840; 
291 static int LOCAL_QUEUE_ENTRIES=163840; 
292 #else
293 static int  REMOTE_QUEUE_ENTRIES=20480;
294 static int LOCAL_QUEUE_ENTRIES=20480; 
295 #endif
296
297 #define BIG_MSG_TAG             0x26
298 #define PUT_DONE_TAG            0x28
299 #define DIRECT_PUT_DONE_TAG     0x29
300 #define ACK_TAG                 0x30
301 /* SMSG is data message */
302 #define SMALL_DATA_TAG          0x31
303 /* SMSG is a control message to initialize a BTE */
304 #define LMSG_INIT_TAG           0x39 
305
306 #define DEBUG
307 #ifdef GNI_RC_CHECK
308 #undef GNI_RC_CHECK
309 #endif
310 #ifdef DEBUG
311 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
312 #else
313 #define GNI_RC_CHECK(msg,rc)
314 #endif
315
316 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
317 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
318 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
319
320 static int useStaticMSGQ = 0;
321 static int useStaticFMA = 0;
322 static int mysize, myrank;
323 static gni_nic_handle_t   nic_hndl;
324
325 typedef struct {
326     gni_mem_handle_t mdh;
327     uint64_t addr;
328 } mdh_addr_t ;
329 // this is related to dynamic SMSG
330
331 typedef struct mdh_addr_list{
332     gni_mem_handle_t mdh;
333    void *addr;
334     struct mdh_addr_list *next;
335 }mdh_addr_list_t;
336
337 static unsigned int         smsg_memlen;
338 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
339 mdh_addr_t          setup_mem;
340 mdh_addr_t          *smsg_connection_vec = 0;
341 gni_mem_handle_t    smsg_connection_memhndl;
342 static int          smsg_expand_slots = 10;
343 static int          smsg_available_slot = 0;
344 static void         *smsg_mailbox_mempool = 0;
345 mdh_addr_list_t     *smsg_dynamic_list = 0;
346
347 static void             *smsg_mailbox_base;
348 gni_msgq_attr_t         msgq_attrs;
349 gni_msgq_handle_t       msgq_handle;
350 gni_msgq_ep_attr_t      msgq_ep_attrs;
351 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
352
353 /* =====Beginning of Declarations of Machine Specific Variables===== */
354 static int cookie;
355 static int modes = 0;
356 static gni_cq_handle_t       smsg_rx_cqh = NULL;
357 static gni_cq_handle_t       default_tx_cqh = NULL;
358 static gni_cq_handle_t       rdma_tx_cqh = NULL;
359 static gni_cq_handle_t       rdma_rx_cqh = NULL;
360 static gni_cq_handle_t       post_tx_cqh = NULL;
361 static gni_ep_handle_t       *ep_hndl_array;
362
363 static CmiNodeLock           *ep_lock_array;
364 static CmiNodeLock           default_tx_cq_lock; 
365 static CmiNodeLock           rdma_tx_cq_lock; 
366 static CmiNodeLock           global_gni_lock; 
367 static CmiNodeLock           rx_cq_lock;
368 static CmiNodeLock           smsg_mailbox_lock;
369 static CmiNodeLock           smsg_rx_cq_lock;
370 static CmiNodeLock           *mempool_lock;
371 //#define     CMK_WITH_STATS      0
372 typedef struct msg_list
373 {
374     uint32_t destNode;
375     uint32_t size;
376     void *msg;
377     uint8_t tag;
378 #if !CMK_SMP
379     struct msg_list *next;
380 #endif
381 #if CMK_WITH_STATS
382     double  creation_time;
383 #endif
384 }MSG_LIST;
385
386
387 typedef struct control_msg
388 {
389     uint64_t            source_addr;    /* address from the start of buffer  */
390     uint64_t            dest_addr;      /* address from the start of buffer */
391     int                 total_length;   /* total length */
392     int                 length;         /* length of this packet */
393 #if REMOTE_EVENT
394     int                 ack_index;      /* index from integer to address */
395 #endif
396     uint8_t             seq_id;         //big message   0 meaning single message
397     gni_mem_handle_t    source_mem_hndl;
398     struct control_msg *next;
399 } CONTROL_MSG;
400
401 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
402
403 typedef struct ack_msg
404 {
405     uint64_t            source_addr;    /* address from the start of buffer  */
406 #if ! USE_LRTS_MEMPOOL
407     gni_mem_handle_t    source_mem_hndl;
408     int                 length;          /* total length */
409 #endif
410     struct ack_msg     *next;
411 } ACK_MSG;
412
413 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
414
415 #if CMK_DIRECT
416 typedef struct{
417     uint64_t    handler_addr;
418 }CMK_DIRECT_HEADER;
419
420 typedef struct {
421     char core[CmiMsgHeaderSizeBytes];
422     uint64_t handler;
423 }cmidirectMsg;
424
425 //SYH
426 CpvDeclare(int, CmiHandleDirectIdx);
427 void CmiHandleDirectMsg(cmidirectMsg* msg)
428 {
429
430     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
431    (*(_handle->callbackFnPtr))(_handle->callbackData);
432    CmiFree(msg);
433 }
434
435 void CmiDirectInit()
436 {
437     CpvInitialize(int,  CmiHandleDirectIdx);
438     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
439 }
440
441 #endif
442 typedef struct  rmda_msg
443 {
444     int                   destNode;
445 #if REMOTE_EVENT
446     int                   ack_index;
447 #endif
448     gni_post_descriptor_t *pd;
449 #if !CMK_SMP
450     struct  rmda_msg      *next;
451 #endif
452 }RDMA_REQUEST;
453
454
455 #if CMK_SMP
456 #define SMP_LOCKS               0
457 #define ONE_SEND_QUEUE                  0
458 PCQueue sendRdmaBuf;
459 typedef struct  msg_list_index
460 {
461     PCQueue     sendSmsgBuf;
462     int         pushed;
463     CmiNodeLock   lock;
464 } MSG_LIST_INDEX;
465 char                *destpe_avail;
466 #if  !ONE_SEND_QUEUE && SMP_LOCKS
467     PCQueue     nonEmptyQueues;
468 #endif
469 #else         /* non-smp */
470
471 static RDMA_REQUEST        *sendRdmaBuf = 0;
472 static RDMA_REQUEST        *sendRdmaTail = 0;
473 typedef struct  msg_list_index
474 {
475     int         next;
476     MSG_LIST    *sendSmsgBuf;
477     MSG_LIST    *tail;
478 } MSG_LIST_INDEX;
479
480 #endif
481
482 // buffered send queue
483 #if ! ONE_SEND_QUEUE
484 typedef struct smsg_queue
485 {
486     MSG_LIST_INDEX   *smsg_msglist_index;
487     int               smsg_head_index;
488 } SMSG_QUEUE;
489 #else
490 typedef struct smsg_queue
491 {
492     PCQueue       sendMsgBuf;
493 }  SMSG_QUEUE;
494 #endif
495
496 SMSG_QUEUE                  smsg_queue;
497 #if CMK_USE_OOB
498 SMSG_QUEUE                  smsg_oob_queue;
499 #endif
500
501 #if CMK_SMP
502
503 #define FreeMsgList(d)   free(d);
504 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
505
506 #else
507
508 static MSG_LIST       *msglist_freelist=0;
509
510 #define FreeMsgList(d)  \
511   do { \
512   (d)->next = msglist_freelist;\
513   msglist_freelist = d; \
514   } while (0)
515
516 #define MallocMsgList(d) \
517   do {  \
518   d = msglist_freelist;\
519   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
520              _MEMCHECK(d);\
521   } else msglist_freelist = d->next; \
522   d->next =0;  \
523   } while (0)
524
525 #endif
526
527 #if CMK_SMP
528
529 #define FreeControlMsg(d)      free(d);
530 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
531
532 #else
533
534 static CONTROL_MSG    *control_freelist=0;
535
536 #define FreeControlMsg(d)       \
537   do { \
538   (d)->next = control_freelist;\
539   control_freelist = d; \
540   } while (0);
541
542 #define MallocControlMsg(d) \
543   do {  \
544   d = control_freelist;\
545   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
546              _MEMCHECK(d);\
547   } else control_freelist = d->next;  \
548   } while (0);
549
550 #endif
551
552 #if CMK_SMP
553
554 #define FreeAckMsg(d)      free(d);
555 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
556
557 #else
558
559 static ACK_MSG        *ack_freelist=0;
560
561 #define FreeAckMsg(d)       \
562   do { \
563   (d)->next = ack_freelist;\
564   ack_freelist = d; \
565   } while (0)
566
567 #define MallocAckMsg(d) \
568   do { \
569   d = ack_freelist;\
570   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
571              _MEMCHECK(d);\
572   } else ack_freelist = d->next; \
573   } while (0)
574
575 #endif
576
577
578 # if CMK_SMP
579 #define FreeRdmaRequest(d)       free(d);
580 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
581 #else
582
583 static RDMA_REQUEST         *rdma_freelist = NULL;
584
585 #define FreeRdmaRequest(d)       \
586   do {  \
587   (d)->next = rdma_freelist;\
588   rdma_freelist = d;    \
589   } while (0)
590
591 #define MallocRdmaRequest(d) \
592   do {   \
593   d = rdma_freelist;\
594   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
595              _MEMCHECK(d);\
596   } else rdma_freelist = d->next; \
597   d->next =0;   \
598   } while (0)
599 #endif
600
601 /* reuse gni_post_descriptor_t */
602 static gni_post_descriptor_t *post_freelist=0;
603
604 #if  !CMK_SMP
605 #define FreePostDesc(d)       \
606     (d)->next_descr = post_freelist;\
607     post_freelist = d;
608
609 #define MallocPostDesc(d) \
610   d = post_freelist;\
611   if (d==0) { \
612      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
613      d->next_descr = 0;\
614       _MEMCHECK(d);\
615   } else post_freelist = d->next_descr;
616 #else
617
618 #define FreePostDesc(d)     free(d);
619 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
620
621 #endif
622
623
624 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
625 static int      buffered_smsg_counter = 0;
626
627 /* SmsgSend return success but message sent is not confirmed by remote side */
628 static MSG_LIST *buffered_fma_head = 0;
629 static MSG_LIST *buffered_fma_tail = 0;
630
631 /* functions  */
632 #define IsFree(a,ind)  !( a& (1<<(ind) ))
633 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
634 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
635
636 CpvDeclare(mempool_type*, mempool);
637
638 #if REMOTE_EVENT
639 /* ack pool for remote events */
640
641 #define ACK_SHIFT                  19
642 #define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
643 #define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
644 #define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
645
646 struct AckPool {
647 void *addr;
648 int next;
649 };
650
651 static struct AckPool   *ackpool;
652 static int    ackpoolsize;
653 static int    ackpool_freehead;
654 static CmiNodeLock  ackpool_lock;
655
656 #define  GetAckAddress(s)          (ackpool[s].addr)
657
658 static void AckPool_init()
659 {
660     int i;
661     if ((1<<ACK_SHIFT) < mysize) 
662         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
663     ackpoolsize = 2048;
664     ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
665     for (i=0; i<ackpoolsize-1; i++) {
666         ackpool[i].next = i+1;
667     }
668     ackpool[i].next = -1;
669     ackpool_freehead = 0;
670 #if MULTI_THREAD_SEND 
671     ackpool_lock  = CmiCreateLock();
672 #endif
673 }
674
675 static
676 inline int AckPool_getslot(void *addr)
677 {
678     int i;
679     int s;
680     CMI_GNI_LOCK(ackpool_lock);
681     s = ackpool_freehead;
682     if (s == -1) {
683         int newsize = ackpoolsize * 2;
684         printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
685         if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
686         struct AckPool *old_ackpool = ackpool;
687         ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
688         memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
689         for (i=ackpoolsize; i<newsize-1; i++) {
690             ackpool[i].next = i+1;
691         }
692         ackpool[i].next = -1;
693         ackpool_freehead = ackpoolsize;
694         s = ackpoolsize;
695         ackpoolsize = newsize;
696         free(old_ackpool);
697     }
698     ackpool_freehead = ackpool[s].next;
699     ackpool[s].addr = addr;
700     CMI_GNI_UNLOCK(ackpool_lock);
701     return s;
702 }
703
704 static
705 inline  void AckPool_freeslot(int s)
706 {
707     CmiAssert(s>=0 && s<ackpoolsize);
708     CMI_GNI_LOCK(ackpool_lock);
709     ackpool[s].next = ackpool_freehead;
710     ackpool_freehead = s;
711     CMI_GNI_UNLOCK(ackpool_lock);
712 }
713
714
715 #endif
716
717 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
718 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
719 #define CHARM_MAGIC_NUMBER               126
720
721 #if CMK_ERROR_CHECKING
722 extern unsigned char computeCheckSum(unsigned char *data, int len);
723 static int checksum_flag = 0;
724 #define CMI_SET_CHECKSUM(msg, len)      \
725         if (checksum_flag)  {   \
726           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
727           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
728         }
729 #define CMI_CHECK_CHECKSUM(msg, len)    \
730         if (checksum_flag)      \
731           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
732             CmiAbort("Fatal error: checksum doesn't agree!\n");
733 #else
734 #define CMI_SET_CHECKSUM(msg, len)
735 #define CMI_CHECK_CHECKSUM(msg, len)
736 #endif
737 /* =====End of Definitions of Message-Corruption Related Macros=====*/
738
739 #if CMK_WITH_STATS
740 FILE *counterLog = NULL;
741 typedef struct comm_thread_stats
742 {
743     uint64_t  smsg_data_count;
744     uint64_t  lmsg_init_count;
745     uint64_t  ack_count;
746     uint64_t  big_msg_ack_count;
747     uint64_t  smsg_count;
748     //times of calling SmsgSend
749     uint64_t  try_smsg_data_count;
750     uint64_t  try_lmsg_init_count;
751     uint64_t  try_ack_count;
752     uint64_t  try_big_msg_ack_count;
753     uint64_t  try_smsg_count;
754     
755     double    max_time_in_send_buffered_smsg;
756     double    all_time_in_send_buffered_smsg;
757
758     uint64_t  rdma_count;
759     uint64_t  try_rdma_count;
760     double    max_time_from_control_to_rdma_init;
761     double    all_time_from_control_to_rdma_init;
762
763     double    max_time_from_rdma_init_to_rdma_done;
764     double    all_time_from_rdma_init_to_rdma_done;
765 } Comm_Thread_Stats;
766
767 static Comm_Thread_Stats   comm_stats;
768
769 static void init_comm_stats()
770 {
771   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
772 }
773
774 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
775
776 #define SMSG_SENT_DONE(creation_time, tag)  \
777         if (print_stats) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
778             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
779             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
780             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
781             comm_stats.smsg_count++; \
782             double inbuff_time = CmiWallTimer() - creation_time;   \
783             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
784             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
785         }
786
787 #define SMSG_TRY_SEND(tag)  \
788         if (print_stats){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
789             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
790             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
791             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
792             comm_stats.try_smsg_count++; \
793         }
794
795 #define  RDMA_TRY_SEND()        if (print_stats) {comm_stats.try_rdma_count++;}
796
797 #define  RDMA_TRANS_DONE(x)      \
798          if (print_stats) {  double rdma_trans_time = CmiWallTimer() - x ; \
799              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
800              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
801          }
802
803 #define  RDMA_TRANS_INIT(x)      \
804          if (print_stats) {   comm_stats.rdma_count++;  \
805              double rdma_trans_time = CmiWallTimer() - x ; \
806              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
807              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
808          }
809
810
811 static void print_comm_stats()
812 {
813     fprintf(counterLog, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
814     fprintf(counterLog, "Node[%d]Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank, 
815             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
816             comm_stats.ack_count, comm_stats.big_msg_ack_count);
817     
818     fprintf(counterLog, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank, 
819             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
820             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
821
822     fprintf(counterLog, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank, comm_stats.rdma_count, comm_stats.try_rdma_count);
823     fprintf(counterLog, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/comm_stats.rdma_count); 
824     fprintf(counterLog, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/comm_stats.rdma_count); 
825 }
826
827 #else
828 #define STATS_ACK_TIME(x)            x
829 #define STATS_SEND_SMSGS_TIME(x)     x
830 #endif
831
832 static int print_stats = 0;
833
834 static void
835 allgather(void *in,void *out, int len)
836 {
837     static int *ivec_ptr=NULL,already_called=0,job_size=0;
838     int i,rc;
839     int my_rank;
840     char *tmp_buf,*out_ptr;
841
842     if(!already_called) {
843
844         rc = PMI_Get_size(&job_size);
845         CmiAssert(rc == PMI_SUCCESS);
846         rc = PMI_Get_rank(&my_rank);
847         CmiAssert(rc == PMI_SUCCESS);
848
849         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
850         CmiAssert(ivec_ptr != NULL);
851
852         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
853         CmiAssert(rc == PMI_SUCCESS);
854
855         already_called = 1;
856
857     }
858
859     tmp_buf = (char *)malloc(job_size * len);
860     CmiAssert(tmp_buf);
861
862     rc = PMI_Allgather(in,tmp_buf,len);
863     CmiAssert(rc == PMI_SUCCESS);
864
865     out_ptr = out;
866
867     for(i=0;i<job_size;i++) {
868
869         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
870
871     }
872
873     free(tmp_buf);
874 }
875
876 static void
877 allgather_2(void *in,void *out, int len)
878 {
879     //PMI_Allgather is out of order
880     int i,rc, extend_len;
881     int  rank_index;
882     char *out_ptr, *out_ref;
883     char *in2;
884
885     extend_len = sizeof(int) + len;
886     in2 = (char*)malloc(extend_len);
887
888     memcpy(in2, &myrank, sizeof(int));
889     memcpy(in2+sizeof(int), in, len);
890
891     out_ptr = (char*)malloc(mysize*extend_len);
892
893     rc = PMI_Allgather(in2, out_ptr, extend_len);
894     GNI_RC_CHECK("allgather", rc);
895
896     out_ref = out;
897
898     for(i=0;i<mysize;i++) {
899         //rank index 
900         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
901         //copy to the rank index slot
902         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
903     }
904
905     free(out_ptr);
906     free(in2);
907
908 }
909
910 static unsigned int get_gni_nic_address(int device_id)
911 {
912     unsigned int address, cpu_id;
913     gni_return_t status;
914     int i, alps_dev_id=-1,alps_address=-1;
915     char *token, *p_ptr;
916
917     p_ptr = getenv("PMI_GNI_DEV_ID");
918     if (!p_ptr) {
919         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
920        
921         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
922     } else {
923         while ((token = strtok(p_ptr,":")) != NULL) {
924             alps_dev_id = atoi(token);
925             if (alps_dev_id == device_id) {
926                 break;
927             }
928             p_ptr = NULL;
929         }
930         CmiAssert(alps_dev_id != -1);
931         p_ptr = getenv("PMI_GNI_LOC_ADDR");
932         CmiAssert(p_ptr != NULL);
933         i = 0;
934         while ((token = strtok(p_ptr,":")) != NULL) {
935             if (i == alps_dev_id) {
936                 alps_address = atoi(token);
937                 break;
938             }
939             p_ptr = NULL;
940             ++i;
941         }
942         CmiAssert(alps_address != -1);
943         address = alps_address;
944     }
945     return address;
946 }
947
948 static uint8_t get_ptag(void)
949 {
950     char *p_ptr, *token;
951     uint8_t ptag;
952
953     p_ptr = getenv("PMI_GNI_PTAG");
954     CmiAssert(p_ptr != NULL);
955     token = strtok(p_ptr, ":");
956     ptag = (uint8_t)atoi(token);
957     return ptag;
958         
959 }
960
961 static uint32_t get_cookie(void)
962 {
963     uint32_t cookie;
964     char *p_ptr, *token;
965
966     p_ptr = getenv("PMI_GNI_COOKIE");
967     CmiAssert(p_ptr != NULL);
968     token = strtok(p_ptr, ":");
969     cookie = (uint32_t)atoi(token);
970
971     return cookie;
972 }
973
974 #if LARGEPAGE
975
976 /* directly mmap memory from hugetlbfs for large pages */
977
978 #include <sys/stat.h>
979 #include <fcntl.h>
980 #include <sys/mman.h>
981 #include <hugetlbfs.h>
982
983 // size must be _tlbpagesize aligned
984 void *my_get_huge_pages(size_t size)
985 {
986     char filename[512];
987     int fd;
988     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
989     void *ptr = NULL;
990
991     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
992     fd = open(filename, O_RDWR | O_CREAT, mode);
993     if (fd == -1) {
994         CmiAbort("my_get_huge_pages: open filed");
995     }
996     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
997     if (ptr == MAP_FAILED) ptr = NULL;
998 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
999     close(fd);
1000     unlink(filename);
1001     return ptr;
1002 }
1003
1004 void my_free_huge_pages(void *ptr, int size)
1005 {
1006 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1007     int ret = munmap(ptr, size);
1008     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1009 }
1010
1011 #endif
1012
1013 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1014 /* TODO: add any that are related */
1015 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1016
1017
1018 #include "machine-lrts.h"
1019 #include "machine-common-core.c"
1020
1021 /* Network progress function is used to poll the network when for
1022    messages. This flushes receive buffers on some  implementations*/
1023 #if CMK_MACHINE_PROGRESS_DEFINED
1024 void CmiMachineProgressImpl() {
1025 }
1026 #endif
1027
1028 static int SendBufferMsg(SMSG_QUEUE *queue);
1029 static void SendRdmaMsg();
1030 static void PumpNetworkSmsg();
1031 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1032 #if CQWRITE
1033 static void PumpCqWriteTransactions();
1034 #endif
1035 #if REMOTE_EVENT
1036 static void PumpRemoteTransactions();
1037 #endif
1038
1039 #if MACHINE_DEBUG_LOG
1040 FILE *debugLog = NULL;
1041 static CmiInt8 buffered_recv_msg = 0;
1042 int         lrts_smsg_success = 0;
1043 int         lrts_received_msg = 0;
1044 #endif
1045
1046 static void sweep_mempool(mempool_type *mptr)
1047 {
1048     int n = 0;
1049     block_header *current = &(mptr->block_head);
1050
1051     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1052     while( current!= NULL) {
1053         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1054         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1055     }
1056     printf("[n %d] sweep_mempool slot END.\n", myrank);
1057 }
1058
1059 inline
1060 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1061 {
1062     block_header *current = *from;
1063
1064     //while(register_memory_size>= MAX_REG_MEM)
1065     //{
1066         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1067             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1068
1069         *from = current;
1070         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1071         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1072         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1073     //}
1074     return GNI_RC_SUCCESS;
1075 }
1076
1077 inline 
1078 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1079 {
1080     gni_return_t status = GNI_RC_SUCCESS;
1081     //int size = GetMempoolsize(msg);
1082     //void *blockaddr = GetMempoolBlockPtr(msg);
1083     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1084    
1085     block_header *current = &(mptr->block_head);
1086     while(register_memory_size>= MAX_REG_MEM)
1087     {
1088         status = deregisterMemory(mptr, &current);
1089         if (status != GNI_RC_SUCCESS) break;
1090     }
1091     if(register_memory_size>= MAX_REG_MEM) return status;
1092
1093     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1094     while(1)
1095     {
1096         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1097         if(status == GNI_RC_SUCCESS)
1098         {
1099             break;
1100         }
1101         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1102         {
1103             CmiAbort("Memory registor for mempool fails\n");
1104         }
1105         else
1106         {
1107             status = deregisterMemory(mptr, &current);
1108             if (status != GNI_RC_SUCCESS) break;
1109         }
1110     }; 
1111     return status;
1112 }
1113
1114 inline 
1115 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1116 {
1117     static int rank = -1;
1118     int i;
1119     gni_return_t status;
1120     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1121     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1122     mempool_type *mptr;
1123
1124     status = registerFromMempool(mptr1, msg, size, t, cqh);
1125     if (status == GNI_RC_SUCCESS) return status;
1126 #if CMK_SMP 
1127     for (i=0; i<CmiMyNodeSize()+1; i++) {
1128       rank = (rank+1)%(CmiMyNodeSize()+1);
1129       mptr = CpvAccessOther(mempool, rank);
1130       if (mptr == mptr1) continue;
1131       status = registerFromMempool(mptr, msg, size, t, cqh);
1132       if (status == GNI_RC_SUCCESS) return status;
1133     }
1134 #endif
1135     return  GNI_RC_ERROR_RESOURCE;
1136 }
1137
1138 inline
1139 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1140 {
1141     MSG_LIST        *msg_tmp;
1142     MallocMsgList(msg_tmp);
1143     msg_tmp->destNode = destNode;
1144     msg_tmp->size   = size;
1145     msg_tmp->msg    = msg;
1146     msg_tmp->tag    = tag;
1147 #if CMK_WITH_STATS
1148     SMSG_CREATION(msg_tmp)
1149 #endif
1150 #if !CMK_SMP
1151     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1152         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1153         queue->smsg_head_index = destNode;
1154         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1155     }else
1156     {
1157         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1158         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1159     }
1160 #else
1161 #if ONE_SEND_QUEUE
1162     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1163 #else
1164 #if SMP_LOCKS
1165     CmiLock(queue->smsg_msglist_index[destNode].lock);
1166     if(queue->smsg_msglist_index[destNode].pushed == 0)
1167     {
1168         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1169     }
1170     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1171     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1172 #else
1173     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1174 #endif
1175 #endif
1176 #endif
1177 #if PRINT_SYH
1178     buffered_smsg_counter++;
1179 #endif
1180 }
1181
1182 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1183 {
1184     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1185 }
1186
1187 inline
1188 static void setup_smsg_connection(int destNode)
1189 {
1190     mdh_addr_list_t  *new_entry = 0;
1191     gni_post_descriptor_t *pd;
1192     gni_smsg_attr_t      *smsg_attr;
1193     gni_return_t status = GNI_RC_NOT_DONE;
1194     RDMA_REQUEST        *rdma_request_msg;
1195     
1196     if(smsg_available_slot == smsg_expand_slots)
1197     {
1198         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1199         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1200         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1201
1202         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1203             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1204             GNI_MEM_READWRITE,   
1205             -1,
1206             &(new_entry->mdh));
1207         smsg_available_slot = 0; 
1208         new_entry->next = smsg_dynamic_list;
1209         smsg_dynamic_list = new_entry;
1210     }
1211     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1212     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1213     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1214     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1215     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1216     smsg_attr->buff_size = smsg_memlen;
1217     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1218     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1219     smsg_local_attr_vec[destNode] = smsg_attr;
1220     smsg_available_slot++;
1221     MallocPostDesc(pd);
1222     pd->type            = GNI_POST_FMA_PUT;
1223     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1224     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1225     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1226     pd->length          = sizeof(gni_smsg_attr_t);
1227     pd->local_addr      = (uint64_t) smsg_attr;
1228     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1229     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1230     pd->src_cq_hndl     = rdma_tx_cqh;
1231     pd->rdma_mode       = 0;
1232     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1233     print_smsg_attr(smsg_attr);
1234     if(status == GNI_RC_ERROR_RESOURCE )
1235     {
1236         MallocRdmaRequest(rdma_request_msg);
1237         rdma_request_msg->destNode = destNode;
1238         rdma_request_msg->pd = pd;
1239         /* buffer this request */
1240     }
1241 #if PRINT_SYH
1242     if(status != GNI_RC_SUCCESS)
1243        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1244     else
1245         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1246 #endif
1247 }
1248
1249 /* useDynamicSMSG */
1250 inline 
1251 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1252 {
1253     gni_return_t status = GNI_RC_NOT_DONE;
1254
1255     if(mailbox_list->offset == mailbox_list->size)
1256     {
1257         dynamic_smsg_mailbox_t *new_mailbox_entry;
1258         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1259         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1260         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1261         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1262         new_mailbox_entry->offset = 0;
1263         
1264         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1265             new_mailbox_entry->size, smsg_rx_cqh,
1266             GNI_MEM_READWRITE,   
1267             -1,
1268             &(new_mailbox_entry->mem_hndl));
1269
1270         GNI_RC_CHECK("register", status);
1271         new_mailbox_entry->next = mailbox_list;
1272         mailbox_list = new_mailbox_entry;
1273     }
1274     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1275     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1276     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1277     local_smsg_attr->mbox_offset = mailbox_list->offset;
1278     mailbox_list->offset += smsg_memlen;
1279     local_smsg_attr->buff_size = smsg_memlen;
1280     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1281     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1282 }
1283
1284 /* useDynamicSMSG */
1285 inline 
1286 static int connect_to(int destNode)
1287 {
1288     gni_return_t status = GNI_RC_NOT_DONE;
1289     CmiAssert(smsg_connected_flag[destNode] == 0);
1290     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1291     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1292     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1293     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1294     
1295     CMI_GNI_LOCK(global_gni_lock)
1296     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1297     CMI_GNI_UNLOCK(global_gni_lock)
1298     if (status == GNI_RC_ERROR_RESOURCE) {
1299       /* possibly destNode is making connection at the same time */
1300       free(smsg_attr_vector_local[destNode]);
1301       smsg_attr_vector_local[destNode] = NULL;
1302       free(smsg_attr_vector_remote[destNode]);
1303       smsg_attr_vector_remote[destNode] = NULL;
1304       mailbox_list->offset -= smsg_memlen;
1305       return 0;
1306     }
1307     GNI_RC_CHECK("GNI_Post", status);
1308     smsg_connected_flag[destNode] = 1;
1309     return 1;
1310 }
1311
1312 inline 
1313 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1314 {
1315     unsigned int          remote_address;
1316     uint32_t              remote_id;
1317     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1318     gni_smsg_attr_t       *smsg_attr;
1319     gni_post_descriptor_t *pd;
1320     gni_post_state_t      post_state;
1321     char                  *real_data; 
1322
1323     if (useDynamicSMSG) {
1324         switch (smsg_connected_flag[destNode]) {
1325         case 0: 
1326             connect_to(destNode);         /* continue to case 1 */
1327         case 1:                           /* pending connection, do nothing */
1328             status = GNI_RC_NOT_DONE;
1329             if(inbuff ==0)
1330                 buffer_small_msgs(queue, msg, size, destNode, tag);
1331             return status;
1332         }
1333     }
1334 #if CMK_SMP
1335 #if ! ONE_SEND_QUEUE
1336     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1337 #endif
1338     {
1339 #else
1340     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1341     {
1342 #endif
1343         //CMI_GNI_LOCK(smsg_mailbox_lock)
1344         CMI_GNI_LOCK(default_tx_cq_lock)
1345 #if CMK_SMP_TRACE_COMMTHREAD
1346         int oldpe = -1;
1347         int oldeventid = -1;
1348         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1349         { 
1350             START_EVENT();
1351             if ( tag == SMALL_DATA_TAG)
1352                 real_data = (char*)msg; 
1353             else 
1354                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1355             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1356             TRACE_COMM_SET_COMM_MSGID(real_data);
1357         }
1358 #endif
1359 #if REMOTE_EVENT
1360         if (tag == LMSG_INIT_TAG) {
1361             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1362             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1363                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1364         }
1365         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1366 #endif
1367 #if     CMK_WITH_STATS
1368         SMSG_TRY_SEND(tag)
1369 #endif
1370 #if CMK_WITH_STATS
1371     double              creation_time;
1372     if (ptr == NULL)
1373         creation_time = CmiWallTimer();
1374     else
1375         creation_time = ptr->creation_time;
1376 #endif
1377
1378     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1379 #if CMK_SMP_TRACE_COMMTHREAD
1380         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1381 #endif
1382         CMI_GNI_UNLOCK(default_tx_cq_lock)
1383         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1384         if(status == GNI_RC_SUCCESS)
1385         {
1386 #if     CMK_WITH_STATS
1387             SMSG_SENT_DONE(creation_time,tag) 
1388 #endif
1389 #if CMK_SMP_TRACE_COMMTHREAD
1390             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1391             { 
1392                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1393             }
1394 #endif
1395         }else
1396             status = GNI_RC_ERROR_RESOURCE;
1397     }
1398     if(status != GNI_RC_SUCCESS && inbuff ==0)
1399         buffer_small_msgs(queue, msg, size, destNode, tag);
1400     return status;
1401 }
1402
1403 inline 
1404 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1405 {
1406     /* construct a control message and send */
1407     CONTROL_MSG         *control_msg_tmp;
1408     MallocControlMsg(control_msg_tmp);
1409     control_msg_tmp->source_addr = (uint64_t)msg;
1410     control_msg_tmp->seq_id    = seqno;
1411     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1412 #if REMOTE_EVENT
1413     control_msg_tmp->ack_index    =  -1;
1414 #endif
1415 #if     USE_LRTS_MEMPOOL
1416     if(size < BIG_MSG)
1417     {
1418         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1419     }
1420     else
1421     {
1422         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1423         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1424         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1425     }
1426 #else
1427     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1428 #endif
1429     return control_msg_tmp;
1430 }
1431
1432 #define BLOCKING_SEND_CONTROL    0
1433
1434 // Large message, send control to receiver, receiver register memory and do a GET, 
1435 // return 1 - send no success
1436 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1437 {
1438     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1439     uint32_t            vmdh_index  = -1;
1440     int                 size;
1441     int                 offset = 0;
1442     uint64_t            source_addr;
1443     int                 register_size; 
1444     void                *msg;
1445
1446     size    =   control_msg_tmp->total_length;
1447     source_addr = control_msg_tmp->source_addr;
1448     register_size = control_msg_tmp->length;
1449
1450 #if  USE_LRTS_MEMPOOL
1451     if( control_msg_tmp->seq_id == 0 ){
1452 #if BLOCKING_SEND_CONTROL
1453         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1454             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1455                 LrtsAdvanceCommunication(0);
1456         }
1457 #endif
1458         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1459         {
1460             msg = (void*)source_addr;
1461             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1462             {
1463                 if(!inbuff)
1464                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1465                 return GNI_RC_ERROR_NOMEM;
1466             }
1467             //register the corresponding mempool
1468             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1469             if(status == GNI_RC_SUCCESS)
1470             {
1471                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1472             }
1473         }else
1474         {
1475             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1476             status = GNI_RC_SUCCESS;
1477         }
1478         if(NoMsgInSend(source_addr))
1479             register_size = GetMempoolsize((void*)(source_addr));
1480         else
1481             register_size = 0;
1482     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1483     {
1484         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1485         source_addr += offset;
1486         size = control_msg_tmp->length;
1487 #if BLOCKING_SEND_CONTROL
1488         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1489             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1490                 LrtsAdvanceCommunication(0);
1491         }
1492 #endif
1493         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1494             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1495             {
1496                 if(!inbuff)
1497                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1498                 return GNI_RC_ERROR_NOMEM;
1499             }
1500             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1501             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1502         }
1503         else
1504         {
1505             status = GNI_RC_SUCCESS;
1506         }
1507         register_size = 0;  
1508     }
1509
1510     if(status == GNI_RC_SUCCESS)
1511     {
1512         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1513         if(status == GNI_RC_SUCCESS)
1514         {
1515             buffered_send_msg += register_size;
1516             if(control_msg_tmp->seq_id == 0)
1517             {
1518                 IncreaseMsgInSend(source_addr);
1519             }
1520             FreeControlMsg(control_msg_tmp);
1521             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1522         }else
1523             status = GNI_RC_ERROR_RESOURCE;
1524
1525     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1526     {
1527         CmiAbort("Memory registor for large msg\n");
1528     }else 
1529     {
1530         status = GNI_RC_ERROR_NOMEM; 
1531         if(!inbuff)
1532             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1533     }
1534     return status;
1535 #else
1536     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1537     if(status == GNI_RC_SUCCESS)
1538     {
1539         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1540         if(status == GNI_RC_SUCCESS)
1541         {
1542             FreeControlMsg(control_msg_tmp);
1543         }
1544     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1545     {
1546         CmiAbort("Memory registor for large msg\n");
1547     }else 
1548     {
1549         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1550     }
1551     return status;
1552 #endif
1553 }
1554
1555 inline void LrtsPrepareEnvelope(char *msg, int size)
1556 {
1557     CmiSetMsgSize(msg, size);
1558     CMI_SET_CHECKSUM(msg, size);
1559 }
1560
1561 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1562 {
1563     gni_return_t        status  =   GNI_RC_SUCCESS;
1564     uint8_t tag;
1565     CONTROL_MSG         *control_msg_tmp;
1566     int                 oob = ( mode & OUT_OF_BAND);
1567     SMSG_QUEUE          *queue;
1568
1569     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1570 #if CMK_USE_OOB
1571     queue = oob? &smsg_oob_queue : &smsg_queue;
1572 #else
1573     queue = &smsg_queue;
1574 #endif
1575
1576     LrtsPrepareEnvelope(msg, size);
1577
1578 #if PRINT_SYH
1579     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1580 #endif 
1581 #if CMK_SMP 
1582     if(size <= SMSG_MAX_MSG)
1583         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1584     else if (size < BIG_MSG) {
1585         control_msg_tmp =  construct_control_msg(size, msg, 0);
1586         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1587     }
1588     else {
1589           CmiSetMsgSeq(msg, 0);
1590           control_msg_tmp =  construct_control_msg(size, msg, 1);
1591           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1592     }
1593 #else   //non-smp, smp(worker sending)
1594     if(size <= SMSG_MAX_MSG)
1595     {
1596         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1597             CmiFree(msg);
1598     }
1599     else if (size < BIG_MSG) {
1600         control_msg_tmp =  construct_control_msg(size, msg, 0);
1601         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1602     }
1603     else {
1604 #if     USE_LRTS_MEMPOOL
1605         CmiSetMsgSeq(msg, 0);
1606         control_msg_tmp =  construct_control_msg(size, msg, 1);
1607         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1608 #else
1609         control_msg_tmp =  construct_control_msg(size, msg, 0);
1610         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1611 #endif
1612     }
1613 #endif
1614     return 0;
1615 }
1616
1617 static void    PumpDatagramConnection();
1618 static      int         event_SetupConnect = 111;
1619 static      int         event_PumpSmsg = 222 ;
1620 static      int         event_PumpTransaction = 333;
1621 static      int         event_PumpRdmaTransaction = 444;
1622 static      int         event_SendBufferSmsg = 444;
1623 static      int         event_SendFmaRdmaMsg = 555;
1624 static      int         event_AdvanceCommunication = 666;
1625
1626 static void registerUserTraceEvents() {
1627 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1628     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1629     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1630     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1631     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1632     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1633     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1634     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1635 #endif
1636 }
1637
1638 static void ProcessDeadlock()
1639 {
1640     static CmiUInt8 *ptr = NULL;
1641     static CmiUInt8  last = 0, mysum, sum;
1642     static int count = 0;
1643     gni_return_t status;
1644     int i;
1645
1646 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1647 //sweep_mempool(CpvAccess(mempool));
1648     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1649     mysum = smsg_send_count + smsg_recv_count;
1650     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1651     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1652     GNI_RC_CHECK("PMI_Allgather", status);
1653     sum = 0;
1654     for (i=0; i<mysize; i++)  sum+= ptr[i];
1655     if (last == 0 || sum == last) 
1656         count++;
1657     else
1658         count = 0;
1659     last = sum;
1660     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1661     if (count == 2) { 
1662         /* detected twice, it is a real deadlock */
1663         if (myrank == 0)  {
1664             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1665             CmiAbort("Fatal> Deadlock detected.");
1666         }
1667
1668     }
1669     _detected_hang = 0;
1670 }
1671
1672 static void CheckProgress()
1673 {
1674     if (smsg_send_count == last_smsg_send_count &&
1675         smsg_recv_count == last_smsg_recv_count ) 
1676     {
1677         _detected_hang = 1;
1678 #if !CMK_SMP
1679         if (_detected_hang) ProcessDeadlock();
1680 #endif
1681
1682     }
1683     else {
1684         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1685         last_smsg_send_count = smsg_send_count;
1686         last_smsg_recv_count = smsg_recv_count;
1687         _detected_hang = 0;
1688     }
1689 }
1690
1691 static void set_limit()
1692 {
1693     //if (!user_set_flag && CmiMyRank() == 0) {
1694     if (CmiMyRank() == 0) {
1695         int mynode = CmiPhysicalNodeID(CmiMyPe());
1696         int numpes = CmiNumPesOnPhysicalNode(mynode);
1697         int numprocesses = numpes / CmiMyNodeSize();
1698         MAX_REG_MEM  = _totalmem / numprocesses;
1699         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1700         if (CmiMyPe() == 0)
1701            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1702         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1703         {
1704              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1705              CmiAbort("memory registration\n");
1706         }
1707     }
1708 }
1709
1710 void LrtsPostCommonInit(int everReturn)
1711 {
1712 #if CMK_DIRECT
1713     CmiDirectInit();
1714 #endif
1715 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1716     CpvInitialize(double, projTraceStart);
1717     /* only PE 0 needs to care about registration (to generate sts file). */
1718     //if (CmiMyPe() == 0) 
1719     {
1720         registerMachineUserEventsFunction(&registerUserTraceEvents);
1721     }
1722 #endif
1723
1724 #if CMK_SMP
1725     CmiIdleState *s=CmiNotifyGetState();
1726     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1727     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1728 #else
1729     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1730     if (useDynamicSMSG)
1731     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1732 #endif
1733
1734 #if ! LARGEPAGE
1735     if (_checkProgress)
1736 #if CMK_SMP
1737     if (CmiMyRank() == 0)
1738 #endif
1739     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1740 #endif
1741  
1742 #if !LARGEPAGE
1743     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1744 #endif
1745 }
1746
1747 /* this is called by worker thread */
1748 void LrtsPostNonLocal(){
1749 #if CMK_SMP_TRACE_COMMTHREAD
1750     double startT, endT;
1751 #endif
1752 #if MULTI_THREAD_SEND
1753     if(mysize == 1) return;
1754 #if CMK_SMP_TRACE_COMMTHREAD
1755     traceEndIdle();
1756 #endif
1757
1758 #if CMK_SMP_TRACE_COMMTHREAD
1759     startT = CmiWallTimer();
1760 #endif
1761
1762 #if CMK_WORKER_SINGLE_TASK
1763     if (CmiMyRank() % 6 == 0)
1764 #endif
1765     PumpNetworkSmsg();
1766
1767 #if CMK_WORKER_SINGLE_TASK
1768     if (CmiMyRank() % 6 == 1)
1769 #endif
1770     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1771
1772 #if CMK_WORKER_SINGLE_TASK
1773     if (CmiMyRank() % 6 == 2)
1774 #endif
1775     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1776
1777 #if REMOTE_EVENT
1778 #if CMK_WORKER_SINGLE_TASK
1779     if (CmiMyRank() % 6 == 3)
1780 #endif
1781     PumpRemoteTransactions();
1782 #endif
1783
1784 #if CMK_USE_OOB
1785     if (SendBufferMsg(&smsg_oob_queue) == 1)
1786 #endif
1787     {
1788 #if CMK_WORKER_SINGLE_TASK
1789     if (CmiMyRank() % 6 == 4)
1790 #endif
1791         SendBufferMsg(&smsg_queue);
1792     }
1793
1794 #if CMK_WORKER_SINGLE_TASK
1795     if (CmiMyRank() % 6 == 5)
1796 #endif
1797     SendRdmaMsg();
1798
1799 #if CMK_SMP_TRACE_COMMTHREAD
1800     endT = CmiWallTimer();
1801     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1802 #endif
1803 #if CMK_SMP_TRACE_COMMTHREAD
1804     traceBeginIdle();
1805 #endif
1806 #endif
1807 }
1808
1809 /* useDynamicSMSG */
1810 static void    PumpDatagramConnection()
1811 {
1812     uint32_t          remote_address;
1813     uint32_t          remote_id;
1814     gni_return_t status;
1815     gni_post_state_t  post_state;
1816     uint64_t          datagram_id;
1817     int i;
1818
1819    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1820    {
1821        if (datagram_id >= mysize) {           /* bound endpoint */
1822            int pe = datagram_id - mysize;
1823            CMI_GNI_LOCK(global_gni_lock)
1824            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1825            CMI_GNI_UNLOCK(global_gni_lock)
1826            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1827            {
1828                CmiAssert(remote_id == pe);
1829                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1830                GNI_RC_CHECK("Dynamic SMSG Init", status);
1831 #if PRINT_SYH
1832                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1833 #endif
1834                CmiAssert(smsg_connected_flag[pe] == 1);
1835                smsg_connected_flag[pe] = 2;
1836            }
1837        }
1838        else {         /* unbound ep */
1839            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1840            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1841            {
1842                CmiAssert(remote_id<mysize);
1843                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1844                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1845                GNI_RC_CHECK("Dynamic SMSG Init", status);
1846 #if PRINT_SYH
1847                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1848 #endif
1849                smsg_connected_flag[remote_id] = 2;
1850
1851                alloc_smsg_attr(&send_smsg_attr);
1852                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1853                GNI_RC_CHECK("post unbound datagram", status);
1854            }
1855        }
1856    }
1857 }
1858
1859 /* pooling CQ to receive network message */
1860 static void PumpNetworkRdmaMsgs()
1861 {
1862     gni_cq_entry_t      event_data;
1863     gni_return_t        status;
1864
1865 }
1866
1867 inline 
1868 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1869 {
1870     RDMA_REQUEST        *rdma_request_msg;
1871     MallocRdmaRequest(rdma_request_msg);
1872     rdma_request_msg->destNode = inst_id;
1873     rdma_request_msg->pd = pd;
1874 #if REMOTE_EVENT
1875     rdma_request_msg->ack_index = ack_index;
1876 #endif
1877 #if CMK_SMP
1878     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1879 #else
1880     if(sendRdmaBuf == 0)
1881     {
1882         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1883     }else{
1884         sendRdmaTail->next = rdma_request_msg;
1885         sendRdmaTail =  rdma_request_msg;
1886     }
1887 #endif
1888
1889 }
1890
1891 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1892
1893 static void PumpNetworkSmsg()
1894 {
1895     uint64_t            inst_id;
1896     int                 ret;
1897     gni_cq_entry_t      event_data;
1898     gni_return_t        status, status2;
1899     void                *header;
1900     uint8_t             msg_tag;
1901     int                 msg_nbytes;
1902     void                *msg_data;
1903     gni_mem_handle_t    msg_mem_hndl;
1904     gni_smsg_attr_t     *smsg_attr;
1905     gni_smsg_attr_t     *remote_smsg_attr;
1906     int                 init_flag;
1907     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1908     uint64_t            source_addr;
1909     SMSG_QUEUE         *queue = &smsg_queue;
1910 #if     CMK_DIRECT
1911     cmidirectMsg        *direct_msg;
1912 #endif
1913     while(1)
1914     {
1915         CMI_GNI_LOCK(smsg_rx_cq_lock)
1916         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1917         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1918         if(status != GNI_RC_SUCCESS)
1919             break;
1920         inst_id = GNI_CQ_GET_INST_ID(event_data);
1921 #if REMOTE_EVENT
1922         inst_id = ACK_GET_RANK(inst_id);
1923 #endif
1924         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1925 #if PRINT_SYH
1926         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1927 #endif
1928         if (useDynamicSMSG) {
1929             /* subtle: smsg may come before connection is setup */
1930             while (smsg_connected_flag[inst_id] != 2) 
1931                PumpDatagramConnection();
1932         }
1933         msg_tag = GNI_SMSG_ANY_TAG;
1934         while(1) {
1935             CMI_GNI_LOCK(smsg_mailbox_lock)
1936             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1937             if (status != GNI_RC_SUCCESS)
1938             {
1939                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1940                 break;
1941             }
1942 #if PRINT_SYH
1943             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1944 #endif
1945             /* copy msg out and then put into queue (small message) */
1946             switch (msg_tag) {
1947             case SMALL_DATA_TAG:
1948             {
1949                 START_EVENT();
1950                 msg_nbytes = CmiGetMsgSize(header);
1951                 msg_data    = CmiAlloc(msg_nbytes);
1952                 memcpy(msg_data, (char*)header, msg_nbytes);
1953                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1954                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1955                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1956                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
1957                 handleOneRecvedMsg(msg_nbytes, msg_data);
1958                 break;
1959             }
1960             case LMSG_INIT_TAG:
1961             {
1962 #if MULTI_THREAD_SEND
1963                 MallocControlMsg(control_msg_tmp);
1964                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1965                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1966                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1967                 getLargeMsgRequest(control_msg_tmp, inst_id);
1968                 FreeControlMsg(control_msg_tmp);
1969 #else
1970                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1971                 getLargeMsgRequest(header, inst_id);
1972                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1973 #endif
1974                 break;
1975             }
1976             case ACK_TAG:   //msg fit into mempool
1977             {
1978                 /* Get is done, release message . Now put is not used yet*/
1979                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1980                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1981                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1982 #if ! USE_LRTS_MEMPOOL
1983                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1984 #else
1985                 DecreaseMsgInSend(msg);
1986 #endif
1987                 if(NoMsgInSend(msg))
1988                     buffered_send_msg -= GetMempoolsize(msg);
1989                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1990                 CmiFree(msg);
1991                 break;
1992             }
1993             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1994             {
1995 #if MULTI_THREAD_SEND
1996                 MallocControlMsg(header_tmp);
1997                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1998                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1999 #else
2000                 header_tmp = (CONTROL_MSG *) header;
2001 #endif
2002                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2003                 void *msg = (void*)(header_tmp->source_addr);
2004                 int cur_seq = CmiGetMsgSeq(msg);
2005                 int offset = ONE_SEG*(cur_seq+1);
2006                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2007                 buffered_send_msg -= header_tmp->length;
2008                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2009                 if (remain_size < 0) remain_size = 0;
2010                 CmiSetMsgSize(msg, remain_size);
2011                 if(remain_size <= 0) //transaction done
2012                 {
2013                     CmiFree(msg);
2014                 }else if (header_tmp->total_length > offset)
2015                 {
2016                     CmiSetMsgSeq(msg, cur_seq+1);
2017                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2018                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2019                     //send next seg
2020                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2021                          // pipelining
2022                     if (header_tmp->seq_id == 1) {
2023                       int i;
2024                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2025                         int seq = cur_seq+i+2;
2026                         CmiSetMsgSeq(msg, seq-1);
2027                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2028                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2029                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2030                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2031                       }
2032                     }
2033                 }
2034 #if MULTI_THREAD_SEND
2035                 FreeControlMsg(header_tmp);
2036 #else
2037                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2038 #endif
2039                 break;
2040             }
2041 #if CMK_PERSISTENT_COMM
2042             case PUT_DONE_TAG:  {   //persistent message
2043                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2044                 int size = ((CONTROL_MSG *) header)->length;
2045                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2046                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2047                 CmiReference(msg);
2048                 CMI_CHECK_CHECKSUM(msg, size);
2049                 handleOneRecvedMsg(size, msg); 
2050 #if PRINT_SYH
2051                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2052 #endif
2053                 break;
2054             }
2055 #endif
2056 #if CMK_DIRECT
2057             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2058                 //create a trigger message
2059                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2060                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2061                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2062                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2063                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2064                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2065                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2066                 break;
2067 #endif
2068             default: {
2069                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2070                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2071                 printf("weird tag problem\n");
2072                 CmiAbort("Unknown tag\n");
2073                      }
2074             }               // end switch
2075 #if PRINT_SYH
2076             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2077 #endif
2078             smsg_recv_count ++;
2079             msg_tag = GNI_SMSG_ANY_TAG;
2080         } //endwhile getNext
2081     }   //end while GetEvent
2082     if(status == GNI_RC_ERROR_RESOURCE)
2083     {
2084         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2085         GNI_RC_CHECK("Smsg_rx_cq full", status);
2086     }
2087 }
2088
2089 static void printDesc(gni_post_descriptor_t *pd)
2090 {
2091     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2092 }
2093
2094 #if CQWRITE
2095 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2096 {
2097     gni_post_descriptor_t *pd;
2098     gni_return_t        status = GNI_RC_SUCCESS;
2099     
2100     MallocPostDesc(pd);
2101     pd->type = GNI_POST_CQWRITE;
2102     pd->cq_mode = GNI_CQMODE_SILENT;
2103     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2104     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2105     pd->cqwrite_value = data;
2106     pd->remote_mem_hndl = mem_hndl;
2107     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2108     GNI_RC_CHECK("GNI_PostCqWrite", status);
2109 }
2110 #endif
2111
2112 // register memory for a message
2113 // return mem handle
2114 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2115 {
2116     gni_return_t status = GNI_RC_SUCCESS;
2117
2118     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2119
2120 #if CMK_PERSISTENT_COMM
2121         // persistent message is always registered
2122     if (!IsMemHndlZero(MEMHFIELD(msg))) {
2123         *memh = MEMHFIELD(msg);
2124         return GNI_RC_SUCCESS;
2125     }
2126 #endif
2127     if(seqno == 0)
2128     {
2129         if(IsMemHndlZero((GetMemHndl(msg))))
2130         {
2131             msg = (void*)(msg);
2132             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2133             if(status == GNI_RC_SUCCESS)
2134                 *memh = GetMemHndl(msg);
2135         }
2136         else {
2137             *memh = GetMemHndl(msg);
2138         }
2139     }
2140     else {
2141         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2142         status = registerMemory(msg, size, memh, NULL); 
2143     }
2144     return status;
2145 }
2146
2147 // for BIG_MSG called on receiver side for receiving control message
2148 // LMSG_INIT_TAG
2149 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2150 {
2151 #if     USE_LRTS_MEMPOOL
2152     CONTROL_MSG         *request_msg;
2153     gni_return_t        status = GNI_RC_SUCCESS;
2154     void                *msg_data;
2155     gni_post_descriptor_t *pd;
2156     gni_mem_handle_t    msg_mem_hndl;
2157     int source, size, transaction_size, offset = 0;
2158     size_t     register_size = 0;
2159
2160     // initial a get to transfer data from the sender side */
2161     request_msg = (CONTROL_MSG *) header;
2162     size = request_msg->total_length;
2163     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2164     MallocPostDesc(pd);
2165 #if CMK_WITH_STATS 
2166     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2167 #endif
2168     if(request_msg->seq_id < 2)   {
2169 #if CMK_SMP_TRACE_COMMTHREAD 
2170         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2171 #endif
2172         msg_data = CmiAlloc(size);
2173         CmiSetMsgSeq(msg_data, 0);
2174         _MEMCHECK(msg_data);
2175     }
2176     else {
2177         offset = ONE_SEG*(request_msg->seq_id-1);
2178         msg_data = (char*)request_msg->dest_addr + offset;
2179     }
2180    
2181     pd->cqwrite_value = request_msg->seq_id;
2182
2183     SetMemHndlZero(pd->local_mem_hndl);
2184     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2185     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2186     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2187         if(NoMsgInRecv( (void*)(msg_data)))
2188             register_size = GetMempoolsize((void*)(msg_data));
2189         else
2190             register_size = 0;
2191     }
2192
2193 #if 0
2194     if( request_msg->seq_id == 0)
2195     {
2196         pd->local_mem_hndl= GetMemHndl(msg_data);
2197         transaction_size = ALIGN64(size);
2198         if(IsMemHndlZero(pd->local_mem_hndl))
2199         {   
2200             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2201             if(status == GNI_RC_SUCCESS)
2202             {
2203                 pd->local_mem_hndl = GetMemHndl(msg_data);
2204             }
2205             else
2206             {
2207                 SetMemHndlZero(pd->local_mem_hndl);
2208             }
2209         }
2210         if(NoMsgInRecv( (void*)(msg_data)))
2211             register_size = GetMempoolsize((void*)(msg_data));
2212         else
2213             register_size = 0;
2214     }
2215     else{
2216         transaction_size = ALIGN64(request_msg->length);
2217         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL); 
2218         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2219         {
2220             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2221         }
2222     }
2223 #endif
2224     pd->first_operand = ALIGN64(size);                   //  total length
2225
2226     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2227         pd->type            = GNI_POST_FMA_GET;
2228     else
2229         pd->type            = GNI_POST_RDMA_GET;
2230     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2231     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2232     pd->length          = transaction_size;
2233     pd->local_addr      = (uint64_t) msg_data;
2234     pd->remote_addr     = request_msg->source_addr + offset;
2235     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2236     pd->src_cq_hndl     = rdma_tx_cqh;
2237     pd->rdma_mode       = 0;
2238     pd->amo_cmd         = 0;
2239
2240     //memory registration success
2241     if(status == GNI_RC_SUCCESS)
2242     {
2243         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2244         CMI_GNI_LOCK(lock)
2245 #if REMOTE_EVENT
2246         if( request_msg->seq_id == 0)
2247         {
2248             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2249             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2250             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2251         }
2252         else {
2253             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
2254             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2255         }
2256 #endif
2257
2258 #if CMK_WITH_STATS
2259         RDMA_TRY_SEND()
2260 #endif
2261         if(pd->type == GNI_POST_RDMA_GET) 
2262         {
2263             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2264         }
2265         else
2266         {
2267             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2268         }
2269         CMI_GNI_UNLOCK(lock)
2270
2271         if(status == GNI_RC_SUCCESS )
2272         {
2273             if(pd->cqwrite_value == 0)
2274             {
2275 #if MACHINE_DEBUG_LOG
2276                 buffered_recv_msg += register_size;
2277                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2278 #endif
2279                 IncreaseMsgInRecv(msg_data);
2280 #if CMK_SMP_TRACE_COMMTHREAD 
2281                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2282 #endif
2283             }
2284 #if  CMK_WITH_STATS
2285                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2286                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2287 #endif
2288         }
2289     }else
2290     {
2291         SetMemHndlZero((pd->local_mem_hndl));
2292     }
2293     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2294     {
2295 #if REMOTE_EVENT
2296         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2297 #else
2298         bufferRdmaMsg(inst_id, pd, -1); 
2299 #endif
2300     }else {
2301          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2302         GNI_RC_CHECK("GetLargeAFter posting", status);
2303     }
2304 #else
2305     CONTROL_MSG         *request_msg;
2306     gni_return_t        status;
2307     void                *msg_data;
2308     gni_post_descriptor_t *pd;
2309     RDMA_REQUEST        *rdma_request_msg;
2310     gni_mem_handle_t    msg_mem_hndl;
2311     //int source;
2312     // initial a get to transfer data from the sender side */
2313     request_msg = (CONTROL_MSG *) header;
2314     msg_data = CmiAlloc(request_msg->length);
2315     _MEMCHECK(msg_data);
2316
2317     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2318
2319     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2320     {
2321         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2322     }
2323
2324     MallocPostDesc(pd);
2325     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2326         pd->type            = GNI_POST_FMA_GET;
2327     else
2328         pd->type            = GNI_POST_RDMA_GET;
2329     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2330     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2331     pd->length          = ALIGN64(request_msg->length);
2332     pd->local_addr      = (uint64_t) msg_data;
2333     pd->remote_addr     = request_msg->source_addr;
2334     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2335     pd->src_cq_hndl     = rdma_tx_cqh;
2336     pd->rdma_mode       = 0;
2337     pd->amo_cmd         = 0;
2338
2339     //memory registration successful
2340     if(status == GNI_RC_SUCCESS)
2341     {
2342         pd->local_mem_hndl  = msg_mem_hndl;
2343        
2344         if(pd->type == GNI_POST_RDMA_GET) 
2345         {
2346             CMI_GNI_LOCK(rdma_tx_cq_lock)
2347             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2348             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2349         }
2350         else
2351         {
2352             CMI_GNI_LOCK(default_tx_cq_lock)
2353             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2354             CMI_GNI_UNLOCK(default_tx_cq_lock)
2355         }
2356
2357     }else
2358     {
2359         SetMemHndlZero(pd->local_mem_hndl);
2360     }
2361     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2362     {
2363         MallocRdmaRequest(rdma_request_msg);
2364         rdma_request_msg->next = 0;
2365         rdma_request_msg->destNode = inst_id;
2366         rdma_request_msg->pd = pd;
2367         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2368     }else {
2369         GNI_RC_CHECK("AFter posting", status);
2370     }
2371 #endif
2372 }
2373
2374 #if CQWRITE
2375 static void PumpCqWriteTransactions()
2376 {
2377
2378     gni_cq_entry_t          ev;
2379     gni_return_t            status;
2380     void                    *msg;   
2381     while(1) {
2382         //CMI_GNI_LOCK(my_cq_lock) 
2383         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2384         //CMI_GNI_UNLOCK(my_cq_lock)
2385         if(status != GNI_RC_SUCCESS) break;
2386         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2387
2388         DecreaseMsgInSend(msg);
2389 #if ! USE_LRTS_MEMPOOL
2390        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2391 #else
2392         DecreaseMsgInSend(msg);
2393 #endif
2394         if(NoMsgInSend(msg))
2395             buffered_send_msg -= GetMempoolsize(msg);
2396         CmiFree(msg);
2397     };
2398     if(status == GNI_RC_ERROR_RESOURCE)
2399     {
2400         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2401     }
2402 }
2403 #endif
2404
2405 #if REMOTE_EVENT
2406 static void PumpRemoteTransactions()
2407 {
2408     gni_cq_entry_t          ev;
2409     gni_return_t            status;
2410     void                    *msg;   
2411     int                     slot;
2412
2413     while(1) {
2414         CMI_GNI_LOCK(global_gni_lock)
2415         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2416         CMI_GNI_UNLOCK(global_gni_lock)
2417         if(status != GNI_RC_SUCCESS) {
2418             break;
2419         }
2420
2421         slot = GNI_CQ_GET_INST_ID(ev);
2422         slot = ACK_GET_INDEX(slot);
2423         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2424
2425         //CMI_GNI_LOCK(ackpool_lock);
2426         msg = GetAckAddress(slot);
2427         //CMI_GNI_UNLOCK(ackpool_lock);
2428
2429         DecreaseMsgInSend(msg);
2430 #if ! USE_LRTS_MEMPOOL
2431        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2432 #else
2433         DecreaseMsgInSend(msg);
2434 #endif
2435         if(NoMsgInSend(msg))
2436             buffered_send_msg -= GetMempoolsize(msg);
2437         CmiFree(msg);
2438         AckPool_freeslot(slot);
2439     }
2440     if(status == GNI_RC_ERROR_RESOURCE)
2441     {
2442         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2443     }
2444 }
2445 #endif
2446
2447 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2448 {
2449     gni_cq_entry_t          ev;
2450     gni_return_t            status;
2451     uint64_t                type, inst_id;
2452     gni_post_descriptor_t   *tmp_pd;
2453     MSG_LIST                *ptr;
2454     CONTROL_MSG             *ack_msg_tmp;
2455     ACK_MSG                 *ack_msg;
2456     uint8_t                 msg_tag;
2457 #if CMK_DIRECT
2458     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2459 #endif
2460     SMSG_QUEUE         *queue = &smsg_queue;
2461
2462     while(1) {
2463         CMI_GNI_LOCK(my_cq_lock) 
2464         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2465         CMI_GNI_UNLOCK(my_cq_lock)
2466         if(status != GNI_RC_SUCCESS) break;
2467         
2468         type = GNI_CQ_GET_TYPE(ev);
2469         if (type == GNI_CQ_EVENT_TYPE_POST)
2470         {
2471             inst_id     = GNI_CQ_GET_INST_ID(ev);
2472 #if PRINT_SYH
2473             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2474 #endif
2475             CMI_GNI_LOCK(my_cq_lock)
2476             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2477             CMI_GNI_UNLOCK(my_cq_lock)
2478
2479             switch (tmp_pd->type) {
2480 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2481             case GNI_POST_RDMA_PUT:
2482 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2483                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2484 #endif
2485             case GNI_POST_FMA_PUT:
2486                 if(tmp_pd->amo_cmd == 1) {
2487 #if CMK_DIRECT
2488                     //sender ACK to receiver to trigger it is done
2489                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2490                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2491                     msg_tag = DIRECT_PUT_DONE_TAG;
2492 #endif
2493                 }
2494                 else {
2495                     CmiFree((void *)tmp_pd->local_addr);
2496                     MallocControlMsg(ack_msg_tmp);
2497                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2498                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2499                     ack_msg_tmp->length  = tmp_pd->length;
2500                     msg_tag = PUT_DONE_TAG;
2501                 }
2502                 break;
2503 #endif
2504             case GNI_POST_RDMA_GET:
2505             case GNI_POST_FMA_GET:  {
2506 #if  ! USE_LRTS_MEMPOOL
2507                 MallocControlMsg(ack_msg_tmp);
2508                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2509                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2510                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2511                 msg_tag = ACK_TAG;  
2512 #else
2513 #if CMK_WITH_STATS
2514                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2515 #endif
2516                 int seq_id = tmp_pd->cqwrite_value;
2517                 if(seq_id > 0)      // BIG_MSG
2518                 {
2519                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2520                     MallocControlMsg(ack_msg_tmp);
2521                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2522                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2523                     ack_msg_tmp->seq_id = seq_id;
2524                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2525                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2526                     ack_msg_tmp->length = tmp_pd->length;
2527                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2528                     msg_tag = BIG_MSG_TAG; 
2529                 } 
2530                 else
2531                 {
2532                     msg_tag = ACK_TAG; 
2533 #if  !REMOTE_EVENT && !CQWRITE
2534                     MallocAckMsg(ack_msg);
2535                     ack_msg->source_addr = tmp_pd->remote_addr;
2536 #endif
2537                 }
2538 #endif
2539                 break;
2540             }
2541             case  GNI_POST_CQWRITE:
2542                    FreePostDesc(tmp_pd);
2543                    continue;
2544             default:
2545                 CmiPrintf("type=%d\n", tmp_pd->type);
2546                 CmiAbort("PumpLocalTransactions: unknown type!");
2547             }      /* end of switch */
2548
2549 #if CMK_DIRECT
2550             if (tmp_pd->amo_cmd == 1) {
2551                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2552                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2553             }
2554             else
2555 #endif
2556             if (msg_tag == ACK_TAG) {
2557 #if !REMOTE_EVENT
2558 #if   !CQWRITE
2559                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2560                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2561 #else
2562                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2563 #endif
2564 #endif
2565             }
2566             else {
2567                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2568                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2569             }
2570 #if CMK_PERSISTENT_COMM
2571             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2572 #endif
2573             {
2574                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2575 #if PRINT_SYH
2576                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2577 #endif
2578                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2579                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2580
2581                     START_EVENT();
2582                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2583                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2584 #if MACHINE_DEBUG_LOG
2585                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2586                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2587                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2588 #endif
2589                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2590                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2591                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2592                 }else if(msg_tag == BIG_MSG_TAG){
2593                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2594                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2595                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2596                         START_EVENT();
2597 #if PRINT_SYH
2598                         printf("Pipeline msg done [%d]\n", myrank);
2599 #endif
2600 #if                 CMK_SMP_TRACE_COMMTHREAD
2601                         if( tmp_pd->cqwrite_value == 1)
2602                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2603 #endif
2604                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2605                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2606                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2607                     }
2608                 }
2609             }
2610             FreePostDesc(tmp_pd);
2611         }
2612     } //end while
2613     if(status == GNI_RC_ERROR_RESOURCE)
2614     {
2615         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2616         GNI_RC_CHECK("Smsg_tx_cq full", status);
2617     }
2618 }
2619
2620 static void  SendRdmaMsg()
2621 {
2622     gni_return_t            status = GNI_RC_SUCCESS;
2623     gni_mem_handle_t        msg_mem_hndl;
2624     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2625     RDMA_REQUEST            *pre = 0;
2626     uint64_t                register_size = 0;
2627     void                    *msg;
2628     int                     i;
2629 #if CMK_SMP
2630     int len = PCQueueLength(sendRdmaBuf);
2631     for (i=0; i<len; i++)
2632     {
2633         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2634         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2635         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2636         if (ptr == NULL) break;
2637 #else
2638     ptr = sendRdmaBuf;
2639     while (ptr!=0)
2640     {
2641 #endif 
2642         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2643         gni_post_descriptor_t *pd = ptr->pd;
2644         status = GNI_RC_SUCCESS;
2645         
2646         msg = (void*)(pd->local_addr);
2647         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2648         if(pd->cqwrite_value == 0) {
2649             if(NoMsgInRecv(msg))
2650                 register_size = GetMempoolsize((void*)(pd->local_addr));
2651             else
2652                 register_size = 0;
2653         }
2654         else
2655             register_size = 0;
2656 #if 0
2657         if(pd->cqwrite_value == 0)
2658         {
2659             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2660             {
2661                 msg = (void*)(pd->local_addr);
2662                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2663                 if(status == GNI_RC_SUCCESS)
2664                 {
2665                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2666                 }
2667             }else
2668             {
2669                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2670             }
2671             if(NoMsgInRecv( (void*)(pd->local_addr)))
2672                 register_size = GetMempoolsize((void*)(pd->local_addr));
2673             else
2674                 register_size = 0;
2675         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2676         {
2677             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL); 
2678         }
2679 #endif
2680         if(status == GNI_RC_SUCCESS)        //mem register good
2681         {
2682             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2683             CMI_GNI_LOCK(lock);
2684 #if REMOTE_EVENT
2685             if( pd->cqwrite_value == 0)
2686             {
2687                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2688                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2689                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2690             }
2691             else {
2692                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
2693                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2694             }
2695 #endif
2696 #if CMK_WITH_STATS
2697             RDMA_TRY_SEND()
2698 #endif
2699             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2700             {
2701                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2702             }
2703             else
2704             {
2705                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2706             }
2707             CMI_GNI_UNLOCK(lock);
2708             
2709             if(status == GNI_RC_SUCCESS)    //post good
2710             {
2711 #if !CMK_SMP
2712                 tmp_ptr = ptr;
2713                 if(pre != 0) {
2714                     pre->next = ptr->next;
2715                 }
2716                 else {
2717                     sendRdmaBuf = ptr->next;
2718                 }
2719                 ptr = ptr->next;
2720                 FreeRdmaRequest(tmp_ptr);
2721 #endif
2722                 if(pd->cqwrite_value == 0)
2723                 {
2724 #if CMK_SMP_TRACE_COMMTHREAD 
2725                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2726 #endif
2727                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2728                 }
2729 #if  CMK_WITH_STATS
2730                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2731                 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2732 #endif
2733 #if MACHINE_DEBUG_LOG
2734                 buffered_recv_msg += register_size;
2735                 MACHSTATE(8, "GO request from buffered\n"); 
2736 #endif
2737 #if PRINT_SYH
2738                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2739 #endif
2740             }else           // cannot post
2741             {
2742 #if CMK_SMP
2743                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2744 #else
2745                 pre = ptr;
2746                 ptr = ptr->next;
2747 #endif
2748 #if PRINT_SYH
2749                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2750 #endif
2751                 break;
2752             }
2753         } else          //memory registration fails
2754         {
2755 #if CMK_SMP
2756             PCQueuePush(sendRdmaBuf, (char*)ptr);
2757 #else
2758             pre = ptr;
2759             ptr = ptr->next;
2760 #endif
2761         }
2762     } //end while
2763 #if ! CMK_SMP
2764     if(ptr == 0)
2765         sendRdmaTail = pre;
2766 #endif
2767 }
2768
2769 // return 1 if all messages are sent
2770 static int SendBufferMsg(SMSG_QUEUE *queue)
2771 {
2772     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2773     CONTROL_MSG         *control_msg_tmp;
2774     gni_return_t        status;
2775     int                 done = 1;
2776     uint64_t            register_size;
2777     void                *register_addr;
2778     int                 index_previous = -1;
2779 #if CMI_EXERT_SEND_CAP
2780     int                 sent_cnt = 0;
2781 #endif
2782
2783 #if CMK_SMP
2784     int          index = 0;
2785 #if ONE_SEND_QUEUE
2786     memset(destpe_avail, 0, mysize * sizeof(char));
2787     for (index=0; index<1; index++)
2788     {
2789         int i, len = PCQueueLength(queue->sendMsgBuf);
2790         for (i=0; i<len; i++) 
2791         {
2792             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2793             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2794             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2795             if(ptr == NULL) break;
2796             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2797                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2798                 continue;
2799             }
2800 #else
2801 #if SMP_LOCKS
2802     int nonempty = PCQueueLength(nonEmptyQueues);
2803     for(index =0; index<nonempty; index++)
2804     {
2805         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2806         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2807         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2808         if(current_list == NULL) break; 
2809         PCQueue current_queue= current_list-> sendSmsgBuf;
2810         CmiLock(current_list->lock);
2811         int i, len = PCQueueLength(current_queue);
2812         current_list->pushed = 0;
2813         CmiUnlock(current_list->lock);
2814 #else
2815     for(index =0; index<mysize; index++)
2816     {
2817         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2818         int i, len = PCQueueLength(current_queue);
2819 #endif
2820         for (i=0; i<len; i++) 
2821         {
2822             CMI_PCQUEUEPOP_LOCK(current_queue)
2823             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2824             CMI_PCQUEUEPOP_UNLOCK(current_queue)
2825             if (ptr == 0) break;
2826 #endif
2827 #else
2828     int index = queue->smsg_head_index;
2829     while(index != -1)
2830     {
2831         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2832         pre = 0;
2833         while(ptr != 0)
2834         {
2835 #endif
2836             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2837             status = GNI_RC_ERROR_RESOURCE;
2838             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2839                 /* connection not exists yet */
2840             }
2841             else
2842             switch(ptr->tag)
2843             {
2844             case SMALL_DATA_TAG:
2845                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2846                 if(status == GNI_RC_SUCCESS)
2847                 {
2848                     CmiFree(ptr->msg);
2849                 }
2850                 break;
2851             case LMSG_INIT_TAG:
2852                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2853                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2854                 break;
2855             case ACK_TAG:
2856                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2857                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2858                 break;
2859             case BIG_MSG_TAG:
2860                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
2861                 if(status == GNI_RC_SUCCESS)
2862                 {
2863                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2864                 }
2865                 break;
2866 #if CMK_DIRECT
2867             case DIRECT_PUT_DONE_TAG:
2868                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
2869                 if(status == GNI_RC_SUCCESS)
2870                 {
2871                     free((CMK_DIRECT_HEADER*)ptr->msg);
2872                 }
2873                 break;
2874
2875 #endif
2876             default:
2877                 printf("Weird tag\n");
2878                 CmiAbort("should not happen\n");
2879             }       // end switch
2880             if(status == GNI_RC_SUCCESS)
2881             {
2882 #if PRINT_SYH
2883                 buffered_smsg_counter--;
2884                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2885 #endif
2886 #if !CMK_SMP
2887                 tmp_ptr = ptr;
2888                 if(pre)
2889                 {
2890                     ptr = pre ->next = ptr->next;
2891                 }else
2892                 {
2893                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2894                 }
2895                 FreeMsgList(tmp_ptr);
2896 #else
2897                 FreeMsgList(ptr);
2898 #endif
2899 #if CMI_EXERT_SEND_CAP
2900                 sent_cnt++;
2901                 if(sent_cnt == SEND_CAP)
2902                     break;
2903 #endif
2904             }else {
2905 #if CMK_SMP
2906 #if ONE_SEND_QUEUE
2907                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2908 #else
2909                 PCQueuePush(current_queue, (char*)ptr);
2910 #endif
2911 #else
2912                 pre = ptr;
2913                 ptr=ptr->next;
2914 #endif
2915                 done = 0;
2916                 if(status == GNI_RC_ERROR_RESOURCE)
2917                 {
2918 #if CMK_SMP && ONE_SEND_QUEUE 
2919                     destpe_avail[ptr->destNode] = 1;
2920 #else
2921                     break;
2922 #endif
2923                 }
2924             } 
2925         } //end while
2926 #if !CMK_SMP
2927         if(ptr == 0)
2928             queue->smsg_msglist_index[index].tail = pre;
2929         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2930         {
2931             if(index_previous != -1)
2932                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2933             else
2934                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2935         }
2936         else {
2937             index_previous = index;
2938         }
2939         index = queue->smsg_msglist_index[index].next;
2940 #else
2941 #if !ONE_SEND_QUEUE && SMP_LOCKS
2942         CmiLock(current_list->lock);
2943         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2944         {
2945             current_list->pushed = 1;
2946             PCQueuePush(nonEmptyQueues, current_list);
2947         }
2948         CmiUnlock(current_list->lock); 
2949 #endif
2950 #endif
2951
2952 #if CMI_EXERT_SEND_CAP
2953         if(sent_cnt == SEND_CAP)
2954                 break;
2955 #endif
2956     }   // end pooling for all cores
2957     return done;
2958 }
2959
2960 static void ProcessDeadlock();
2961 void LrtsAdvanceCommunication(int whileidle)
2962 {
2963     static int count = 0;
2964     /*  Receive Msg first */
2965 #if CMK_SMP_TRACE_COMMTHREAD
2966     double startT, endT;
2967 #endif
2968     if (useDynamicSMSG && whileidle)
2969     {
2970 #if CMK_SMP_TRACE_COMMTHREAD
2971         startT = CmiWallTimer();
2972 #endif
2973         PumpDatagramConnection();
2974 #if CMK_SMP_TRACE_COMMTHREAD
2975         endT = CmiWallTimer();
2976         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2977 #endif
2978     }
2979
2980 #if CMK_SMP_TRACE_COMMTHREAD
2981     startT = CmiWallTimer();
2982 #endif
2983     PumpNetworkSmsg();
2984     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2985 #if CMK_SMP_TRACE_COMMTHREAD
2986     endT = CmiWallTimer();
2987     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2988 #endif
2989
2990 #if CMK_SMP_TRACE_COMMTHREAD
2991     startT = CmiWallTimer();
2992 #endif
2993     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2994     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
2995 #if CMK_SMP_TRACE_COMMTHREAD
2996     endT = CmiWallTimer();
2997     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2998 #endif
2999
3000 #if CMK_SMP_TRACE_COMMTHREAD
3001     startT = CmiWallTimer();
3002 #endif
3003     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
3004
3005 #if CQWRITE
3006     PumpCqWriteTransactions();
3007 #endif
3008
3009 #if REMOTE_EVENT
3010     PumpRemoteTransactions();
3011 #endif
3012
3013     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3014 #if CMK_SMP_TRACE_COMMTHREAD
3015     endT = CmiWallTimer();
3016     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3017 #endif
3018  
3019     /* Send buffered Message */
3020 #if CMK_SMP_TRACE_COMMTHREAD
3021     startT = CmiWallTimer();
3022 #endif
3023 #if CMK_USE_OOB
3024     if (SendBufferMsg(&smsg_oob_queue) == 1)
3025 #endif
3026     {
3027     SendBufferMsg(&smsg_queue);
3028     }
3029     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3030 #if CMK_SMP_TRACE_COMMTHREAD
3031     endT = CmiWallTimer();
3032     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3033 #endif
3034
3035 #if CMK_SMP_TRACE_COMMTHREAD
3036     startT = CmiWallTimer();
3037 #endif
3038     SendRdmaMsg();
3039     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3040 #if CMK_SMP_TRACE_COMMTHREAD
3041     endT = CmiWallTimer();
3042     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3043 #endif
3044
3045 #if CMK_SMP && ! LARGEPAGE
3046     if (_detected_hang)  ProcessDeadlock();
3047 #endif
3048 }
3049
3050 /* useDynamicSMSG */
3051 static void _init_dynamic_smsg()
3052 {
3053     gni_return_t status;
3054     uint32_t     vmdh_index = -1;
3055     int i;
3056
3057     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3058     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3059     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3060     for(i=0; i<mysize; i++) {
3061         smsg_connected_flag[i] = 0;
3062         smsg_attr_vector_local[i] = NULL;
3063         smsg_attr_vector_remote[i] = NULL;
3064     }
3065     if(mysize <=512)
3066     {
3067         SMSG_MAX_MSG = 4096;
3068     }else if (mysize <= 4096)
3069     {
3070         SMSG_MAX_MSG = 4096/mysize * 1024;
3071     }else if (mysize <= 16384)
3072     {
3073         SMSG_MAX_MSG = 512;
3074     }else {
3075         SMSG_MAX_MSG = 256;
3076     }
3077
3078     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3079     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3080     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3081     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3082     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3083
3084     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3085     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3086     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3087     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3088     mailbox_list->offset = 0;
3089     mailbox_list->next = 0;
3090     
3091     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3092         mailbox_list->size, smsg_rx_cqh,
3093         GNI_MEM_READWRITE,   
3094         vmdh_index,
3095         &(mailbox_list->mem_hndl));
3096     GNI_RC_CHECK("MEMORY registration for smsg", status);
3097
3098     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3099     GNI_RC_CHECK("Unbound EP", status);
3100     
3101     alloc_smsg_attr(&send_smsg_attr);
3102
3103     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3104     GNI_RC_CHECK("post unbound datagram", status);
3105
3106       /* always pre-connect to proc 0 */
3107     //if (myrank != 0) connect_to(0);
3108 }
3109
3110 static void _init_static_smsg()
3111 {
3112     gni_smsg_attr_t      *smsg_attr;
3113     gni_smsg_attr_t      remote_smsg_attr;
3114     gni_smsg_attr_t      *smsg_attr_vec;
3115     gni_mem_handle_t     my_smsg_mdh_mailbox;
3116     int      ret, i;
3117     gni_return_t status;
3118     uint32_t              vmdh_index = -1;
3119     mdh_addr_t            base_infor;
3120     mdh_addr_t            *base_addr_vec;
3121     char *env;
3122
3123     if(mysize <=512)
3124     {
3125         SMSG_MAX_MSG = 1024;
3126     }else if (mysize <= 4096)
3127     {
3128         SMSG_MAX_MSG = 1024;
3129     }else if (mysize <= 16384)
3130     {
3131         SMSG_MAX_MSG = 512;
3132     }else {
3133         SMSG_MAX_MSG = 256;
3134     }
3135     
3136     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3137     if (env) SMSG_MAX_MSG = atoi(env);
3138     CmiAssert(SMSG_MAX_MSG > 0);
3139
3140     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3141     
3142     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3143     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3144     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3145     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3146     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3147     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3148     CmiAssert(ret == 0);
3149     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3150     
3151     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3152             smsg_memlen*(mysize), smsg_rx_cqh,
3153             GNI_MEM_READWRITE,   
3154             vmdh_index,
3155             &my_smsg_mdh_mailbox);
3156     register_memory_size += smsg_memlen*(mysize);
3157     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3158
3159     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3160     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3161
3162     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3163     base_infor.mdh =  my_smsg_mdh_mailbox;
3164     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3165
3166     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3167  
3168     for(i=0; i<mysize; i++)
3169     {
3170         if(i==myrank)
3171             continue;
3172         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3173         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3174         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3175         smsg_attr[i].mbox_offset = i*smsg_memlen;
3176         smsg_attr[i].buff_size = smsg_memlen;
3177         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3178         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3179     }
3180
3181     for(i=0; i<mysize; i++)
3182     {
3183         if (myrank == i) continue;
3184
3185         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3186         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3187         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3188         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3189         remote_smsg_attr.buff_size = smsg_memlen;
3190         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3191         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3192
3193         /* initialize the smsg channel */
3194         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3195         GNI_RC_CHECK("SMSG Init", status);
3196     } //end initialization
3197
3198     free(base_addr_vec);
3199     free(smsg_attr);
3200
3201     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3202     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3203
3204
3205 inline
3206 static void _init_send_queue(SMSG_QUEUE *queue)
3207 {
3208      int i;
3209 #if ONE_SEND_QUEUE
3210      queue->sendMsgBuf = PCQueueCreate();
3211      destpe_avail = (char*)malloc(mysize * sizeof(char));
3212 #else
3213      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3214 #if CMK_SMP && SMP_LOCKS
3215      nonEmptyQueues = PCQueueCreate();
3216 #endif
3217      for(i =0; i<mysize; i++)
3218      {
3219 #if CMK_SMP
3220          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3221 #if SMP_LOCKS
3222          queue->smsg_msglist_index[i].pushed = 0;
3223          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3224 #endif
3225 #else
3226          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3227          queue->smsg_msglist_index[i].next = -1;
3228          queue->smsg_head_index = -1;
3229 #endif
3230         
3231      }
3232 #endif
3233 }
3234
3235 inline
3236 static void _init_smsg()
3237 {
3238     if(mysize > 1) {
3239         if (useDynamicSMSG)
3240             _init_dynamic_smsg();
3241         else
3242             _init_static_smsg();
3243     }
3244
3245     _init_send_queue(&smsg_queue);
3246 #if CMK_USE_OOB
3247     _init_send_queue(&smsg_oob_queue);
3248 #endif
3249 }
3250
3251 static void _init_static_msgq()
3252 {
3253     gni_return_t status;
3254     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3255     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3256     msgq_attrs.smsg_q_sz = 1;
3257     msgq_attrs.rcv_pool_sz = 1;
3258     msgq_attrs.num_msgq_eps = 2;
3259     msgq_attrs.nloc_insts = 8;
3260     msgq_attrs.modes = 0;
3261     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3262
3263     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3264     GNI_RC_CHECK("MSGQ Init", status);
3265
3266
3267 }
3268
3269
3270 static CmiUInt8 total_mempool_size = 0;
3271 static CmiUInt8 total_mempool_calls = 0;
3272
3273 #if USE_LRTS_MEMPOOL
3274 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3275 {
3276     void *pool;
3277     int ret;
3278     gni_return_t status = GNI_RC_SUCCESS;
3279
3280     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3281     if (*size < default_size) *size = default_size;
3282 #if LARGEPAGE
3283     // round up to be multiple of _tlbpagesize
3284     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3285     *size = ALIGNHUGEPAGE(*size);
3286 #endif
3287     total_mempool_size += *size;
3288     total_mempool_calls += 1;
3289 #if   !LARGEPAGE
3290     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3291     {
3292         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3293         CmiAbort("alloc_mempool_block");
3294     }
3295 #endif
3296 #if LARGEPAGE
3297     pool = my_get_huge_pages(*size);
3298     ret = pool==NULL;
3299 #else
3300     ret = posix_memalign(&pool, ALIGNBUF, *size);
3301 #endif
3302     if (ret != 0) {
3303 #if CMK_SMP && STEAL_MEMPOOL
3304       pool = steal_mempool_block(size, mem_hndl);
3305       if (pool != NULL) return pool;
3306 #endif
3307       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3308       if (ret == ENOMEM)
3309         CmiAbort("alloc_mempool_block: out of memory.");
3310       else
3311         CmiAbort("alloc_mempool_block: posix_memalign failed");
3312     }
3313 #if LARGEPAGE
3314     CmiMemLock();
3315     register_count++;
3316     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3317     CmiMemUnlock();
3318     if(status != GNI_RC_SUCCESS) {
3319         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3320 sweep_mempool(CpvAccess(mempool));
3321     }
3322     GNI_RC_CHECK("MEMORY_REGISTER", status);
3323 #else
3324     SetMemHndlZero((*mem_hndl));
3325 #endif
3326     return pool;
3327 }
3328
3329 // ptr is a block head pointer
3330 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3331 {
3332     if(!(IsMemHndlZero(mem_hndl)))
3333     {
3334         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3335     }
3336 #if LARGEPAGE
3337     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3338 #else
3339     free(ptr);
3340 #endif
3341 }
3342 #endif
3343
3344 void LrtsPreCommonInit(int everReturn){
3345 #if USE_LRTS_MEMPOOL
3346     CpvInitialize(mempool_type*, mempool);
3347     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3348     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3349 #endif
3350 }
3351
3352 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3353 {
3354     register int            i;
3355     int                     rc;
3356     int                     device_id = 0;
3357     unsigned int            remote_addr;
3358     gni_cdm_handle_t        cdm_hndl;
3359     gni_return_t            status = GNI_RC_SUCCESS;
3360     uint32_t                vmdh_index = -1;
3361     uint8_t                 ptag;
3362     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3363     int                     first_spawned;
3364     int                     physicalID;
3365     char                   *env;
3366
3367     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3368     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3369     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3370    
3371     status = PMI_Init(&first_spawned);
3372     GNI_RC_CHECK("PMI_Init", status);
3373
3374     status = PMI_Get_size(&mysize);
3375     GNI_RC_CHECK("PMI_Getsize", status);
3376
3377     status = PMI_Get_rank(&myrank);
3378     GNI_RC_CHECK("PMI_getrank", status);
3379
3380     //physicalID = CmiPhysicalNodeID(myrank);
3381     
3382     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3383
3384     *myNodeID = myrank;
3385     *numNodes = mysize;
3386   
3387 #if MULTI_THREAD_SEND
3388     /* Currently, we only consider the case that comm. thread will only recv msgs */
3389     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3390 #endif
3391
3392     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3393     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3394     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3395
3396     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3397     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3398     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3399
3400     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3401     if (env) useDynamicSMSG = 1;
3402     if (!useDynamicSMSG)
3403       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3404     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3405     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3406     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3407     
3408     if(myrank == 0)
3409     {
3410         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3411         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3412     }
3413 #ifdef USE_ONESIDED
3414     onesided_init(NULL, &onesided_hnd);
3415
3416     // this is a GNI test, so use the libonesided bypass functionality
3417     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3418     local_addr = gniGetNicAddress();
3419 #else
3420     ptag = get_ptag();
3421     cookie = get_cookie();
3422 #if 0
3423     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3424 #endif
3425     //Create and attach to the communication  domain */
3426     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3427     GNI_RC_CHECK("GNI_CdmCreate", status);
3428     //* device id The device id is the minor number for the device
3429     //that is assigned to the device by the system when the device is created.
3430     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3431     //where X is the device number 0 default 
3432     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3433     GNI_RC_CHECK("GNI_CdmAttach", status);
3434     local_addr = get_gni_nic_address(0);
3435 #endif
3436     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3437     _MEMCHECK(MPID_UGNI_AllAddr);
3438     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3439     /* create the local completion queue */
3440     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3441     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3442     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3443     
3444     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3445     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3446     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3447
3448     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3449     GNI_RC_CHECK("Create CQ (rx)", status);
3450     
3451     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3452     GNI_RC_CHECK("Create Post CQ (rx)", status);
3453     
3454     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3455     //GNI_RC_CHECK("Create BTE CQ", status);
3456
3457     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3458     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3459     _MEMCHECK(ep_hndl_array);
3460 #if MULTI_THREAD_SEND 
3461     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3462     //default_tx_cq_lock = CmiCreateLock();
3463     rdma_tx_cq_lock = CmiCreateLock();
3464     smsg_rx_cq_lock = CmiCreateLock();
3465     //global_gni_lock  = CmiCreateLock();
3466     //rx_cq_lock  = CmiCreateLock();
3467 #endif
3468     for (i=0; i<mysize; i++) {
3469         if(i == myrank) continue;
3470         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3471         GNI_RC_CHECK("GNI_EpCreate ", status);   
3472         remote_addr = MPID_UGNI_AllAddr[i];
3473         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3474         GNI_RC_CHECK("GNI_EpBind ", status);   
3475     }
3476
3477     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3478     _init_smsg();
3479     PMI_Barrier();
3480
3481 #if     USE_LRTS_MEMPOOL
3482     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3483     if (env) {
3484         _totalmem = CmiReadSize(env);
3485         if (myrank == 0)
3486             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3487     }
3488
3489     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3490     if (env) _mempool_size = CmiReadSize(env);
3491     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3492         _mempool_size = CmiReadSize(env);
3493
3494
3495     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3496     if (env) {
3497         MAX_REG_MEM = CmiReadSize(env);
3498         user_set_flag = 1;
3499     }
3500     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3501         MAX_REG_MEM = CmiReadSize(env);
3502         user_set_flag = 1;
3503     }
3504
3505     env = getenv("CHARM_UGNI_SEND_MAX");
3506     if (env) {
3507         MAX_BUFF_SEND = CmiReadSize(env);
3508         user_set_flag = 1;
3509     }
3510     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3511         MAX_BUFF_SEND = CmiReadSize(env);
3512         user_set_flag = 1;
3513     }
3514
3515     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3516     if (env) {
3517         _mempool_size_limit = CmiReadSize(env);
3518     }
3519
3520     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3521     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3522
3523     if (myrank==0) {
3524         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3525         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3526         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3527             /* memblock can expand to BIG_MSG * 2 size */
3528             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3529             CmiAbort("mempool maximum size is too small. \n");
3530         }
3531 #if MULTI_THREAD_SEND
3532         printf("Charm++> worker thread sending messages\n");
3533 #elif COMM_THREAD_SEND
3534         printf("Charm++> only comm thread send/recv messages\n");
3535 #endif
3536     }
3537
3538 #endif     /* end of USE_LRTS_MEMPOOL */
3539
3540     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3541     if (env) {
3542         BIG_MSG = CmiReadSize(env);
3543         if (BIG_MSG < ONE_SEG)
3544           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3545     }
3546     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3547     if (env) {
3548         BIG_MSG_PIPELINE = atoi(env);
3549     }
3550
3551     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3552     if (env) _checkProgress = 0;
3553     if (mysize == 1) _checkProgress = 0;
3554
3555     
3556     /*
3557     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3558     if (env) 
3559         _tlbpagesize = CmiReadSize(env);
3560     */
3561     /* real gethugepagesize() is only available when hugetlb module linked */
3562     _tlbpagesize = gethugepagesize();
3563     if (myrank == 0) {
3564         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3565     }
3566
3567 #if LARGEPAGE
3568     if (_tlbpagesize == 4096) {
3569         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3570     }
3571 #endif
3572
3573     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3574
3575     /* init DMA buffer for medium message */
3576
3577     //_init_DMA_buffer();
3578     
3579     free(MPID_UGNI_AllAddr);
3580 #if CMK_SMP
3581     sendRdmaBuf = PCQueueCreate();
3582 #else
3583     sendRdmaBuf = 0;
3584 #endif
3585 #if MACHINE_DEBUG_LOG
3586     char ln[200];
3587     sprintf(ln,"debugLog.%d",myrank);
3588     debugLog=fopen(ln,"w");
3589 #endif
3590
3591 #if CMK_WITH_STATS
3592     if (print_stats){
3593         char ln[200];
3594         int code = mkdir("counters", 00777); 
3595         sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
3596         counterLog=fopen(ln,"w");
3597     }
3598 #endif
3599 //    NTK_Init();
3600 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3601
3602 #if  REMOTE_EVENT
3603     AckPool_init();
3604 #endif
3605
3606 #if CMK_WITH_STATS
3607     init_comm_stats();
3608 #endif
3609 }
3610
3611 void* LrtsAlloc(int n_bytes, int header)
3612 {
3613     void *ptr = NULL;
3614 #if 0
3615     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3616 #endif
3617     if(n_bytes <= SMSG_MAX_MSG)
3618     {
3619         int totalsize = n_bytes+header;
3620         ptr = malloc(totalsize);
3621     }
3622     else {
3623         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3624 #if     USE_LRTS_MEMPOOL
3625         n_bytes = ALIGN64(n_bytes);
3626         if(n_bytes < BIG_MSG)
3627         {
3628             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3629             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3630         }else 
3631         {
3632 #if LARGEPAGE
3633             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3634             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3635             char *res = my_get_huge_pages(n_bytes);
3636 #else
3637             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3638 #endif
3639             if (res) ptr = res + ALIGNBUF - header;
3640         }
3641 #else
3642         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3643         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3644         ptr = res + ALIGNBUF - header;
3645 #endif
3646     }
3647 #if CMK_PERSISTENT_COMM
3648     if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3649 #endif
3650     return ptr;
3651 }
3652
3653 void  LrtsFree(void *msg)
3654 {
3655     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3656 #if CMK_PERSISTENT_COMM
3657     if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3658 #endif
3659     if (size <= SMSG_MAX_MSG)
3660         free(msg);
3661     else {
3662         size = ALIGN64(size);
3663         if(size>=BIG_MSG)
3664         {
3665 #if LARGEPAGE
3666             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3667             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3668 #else
3669             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3670 #endif
3671         }
3672         else {
3673 #if    USE_LRTS_MEMPOOL
3674 #if CMK_SMP
3675             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3676 #else
3677             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3678 #endif
3679 #else
3680             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3681 #endif
3682         }
3683     }
3684 }
3685
3686 void LrtsExit()
3687 {
3688 #if CMK_WITH_STATS
3689 #if CMK_SMP
3690     if(CmiMyRank() == CmiMyNodeSize())
3691 #endif
3692     if (print_stats) print_comm_stats();
3693 #endif
3694     /* free memory ? */
3695 #if USE_LRTS_MEMPOOL
3696     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3697     mempool_destroy(CpvAccess(mempool));
3698 #endif
3699     PMI_Finalize();
3700     exit(0);
3701 }
3702
3703 void LrtsDrainResources()
3704 {
3705     if(mysize == 1) return;
3706     while (
3707 #if CMK_USE_OOB
3708            !SendBufferMsg(&smsg_oob_queue) ||
3709 #endif
3710            !SendBufferMsg(&smsg_queue) 
3711           )
3712     {
3713         if (useDynamicSMSG)
3714             PumpDatagramConnection();
3715         PumpNetworkSmsg();
3716         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3717         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3718         SendRdmaMsg();
3719     }
3720     PMI_Barrier();
3721 }
3722
3723 void LrtsAbort(const char *message) {
3724     printf("CmiAbort is calling on PE:%d\n", myrank);
3725     CmiPrintStackTrace(0);
3726     PMI_Abort(-1, message);
3727 }
3728
3729 /**************************  TIMER FUNCTIONS **************************/
3730 #if CMK_TIMER_USE_SPECIAL
3731 /* MPI calls are not threadsafe, even the timer on some machines */
3732 static CmiNodeLock  timerLock = 0;
3733 static int _absoluteTime = 0;
3734 static int _is_global = 0;
3735 static struct timespec start_ts;
3736
3737 inline int CmiTimerIsSynchronized() {
3738     return 0;
3739 }
3740
3741 inline int CmiTimerAbsolute() {
3742     return _absoluteTime;
3743 }
3744
3745 double CmiStartTimer() {
3746     return 0.0;
3747 }
3748
3749 double CmiInitTime() {
3750     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3751 }
3752
3753 void CmiTimerInit(char **argv) {
3754     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3755     if (_absoluteTime && CmiMyPe() == 0)
3756         printf("Charm++> absolute  timer is used\n");
3757     
3758     _is_global = CmiTimerIsSynchronized();
3759
3760
3761     if (_is_global) {
3762         if (CmiMyRank() == 0) {
3763             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3764         }
3765     } else { /* we don't have a synchronous timer, set our own start time */
3766         CmiBarrier();
3767         CmiBarrier();
3768         CmiBarrier();
3769         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3770     }
3771     CmiNodeAllBarrier();          /* for smp */
3772 }
3773
3774 /**
3775  * Since the timerLock is never created, and is
3776  * always NULL, then all the if-condition inside
3777  * the timer functions could be disabled right
3778  * now in the case of SMP.
3779  */
3780 double CmiTimer(void) {
3781     struct timespec now_ts;
3782     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3783     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3784         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3785 }
3786
3787 double CmiWallTimer(void) {
3788     struct timespec now_ts;
3789     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3790     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3791         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3792 }
3793
3794 double CmiCpuTimer(void) {
3795     struct timespec now_ts;
3796     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3797     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3798         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3799 }
3800
3801 #endif
3802 /************Barrier Related Functions****************/
3803
3804 int CmiBarrier()
3805 {
3806     gni_return_t status;
3807
3808 #if CMK_SMP
3809     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3810     CmiNodeAllBarrier();
3811     if (CmiMyRank() == CmiMyNodeSize())
3812 #else
3813     if (CmiMyRank() == 0)
3814 #endif
3815     {
3816         /**
3817          *  The call of CmiBarrier is usually before the initialization
3818          *  of trace module of Charm++, therefore, the START_EVENT
3819          *  and END_EVENT are disabled here. -Chao Mei
3820          */
3821         /*START_EVENT();*/
3822         status = PMI_Barrier();
3823         GNI_RC_CHECK("PMI_Barrier", status);
3824         /*END_EVENT(10);*/
3825     }
3826     CmiNodeAllBarrier();
3827     return 0;
3828
3829 }
3830 #if CMK_DIRECT
3831 #include "machine-cmidirect.c"
3832 #endif
3833 #if CMK_PERSISTENT_COMM
3834 #include "machine-persistent.c"
3835 #endif
3836
3837