resolve conflict
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <gni_pub.h>
38 #include <pmi.h>
39
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #define     LARGEPAGE           0
45
46 #if LARGEPAGE
47 #include <hugetlbfs.h>
48 #endif
49
50 #if CMK_DIRECT
51 #include "cmidirect.h"
52 #endif
53 #define PRINT_SYH  0
54
55 #define USE_LRTS_MEMPOOL                  1
56
57 #define REMOTE_EVENT                      0
58
59 #define CMI_EXERT_SEND_CAP      0
60 #define CMI_EXERT_RECV_CAP      0
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 16
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70 #if CMK_SMP
71 #define COMM_THREAD_SEND 1
72 //#define MULTI_THREAD_SEND 1
73 #endif
74
75 #if CMK_SMP
76 #define PIGGYBACK_ACK                        0
77 #endif
78
79 // Trace communication thread
80 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
81 #define TRACE_THRESHOLD     0.00005
82 #define CMI_MPI_TRACE_MOREDETAILED 0
83 #undef CMI_MPI_TRACE_USEREVENTS
84 #define CMI_MPI_TRACE_USEREVENTS 1
85 #else
86 #undef CMK_SMP_TRACE_COMMTHREAD
87 #define CMK_SMP_TRACE_COMMTHREAD 0
88 #endif
89
90 #define CMK_TRACE_COMMOVERHEAD 0
91 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
92 #undef CMI_MPI_TRACE_USEREVENTS
93 #define CMI_MPI_TRACE_USEREVENTS 1
94 #else
95 #undef CMK_TRACE_COMMOVERHEAD
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #endif
98
99 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
100 CpvStaticDeclare(double, projTraceStart);
101 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
102 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
103 #else
104 #define  START_EVENT()
105 #define  END_EVENT(x)
106 #endif
107
108 #if USE_LRTS_MEMPOOL
109
110 #define oneMB (1024ll*1024)
111 #define oneGB (1024ll*1024*1024)
112
113 static CmiInt8 _mempool_size = 8*oneMB;
114 static CmiInt8 _expand_mem =  4*oneMB;
115 static CmiInt8 _mempool_size_limit = 0;
116
117 static CmiInt8 _totalmem = 0.8*oneGB;
118
119 #if LARGEPAGE
120 static int BIG_MSG  =  16*oneMB;
121 static int ONE_SEG  =  4*oneMB;
122 #else
123 static int BIG_MSG  =  4*oneMB;
124 static int ONE_SEG  =  2*oneMB;
125 #endif
126 static int BIG_MSG_PIPELINE = 4;
127
128 // dynamic flow control
129 static CmiInt8 buffered_send_msg = 0;
130 static CmiInt8 register_memory_size = 0;
131
132 #if LARGEPAGE
133 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
134 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
135 static CmiInt8  register_count = 0;
136 #else
137 #if CMK_SMP && COMM_THREAD_SEND 
138 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
139 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
140 #else
141 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
143 #endif
144
145
146 #endif
147
148 #endif     /* end USE_LRTS_MEMPOOL */
149
150 #if CMK_SMP && MULTI_THREAD_SEND
151 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
152 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
153 #else
154 #define     CMI_GNI_LOCK
155 #define     CMI_GNI_UNLOCK
156 #endif
157
158 static int _tlbpagesize = 4096;
159
160 //static int _smpd_count  = 0;
161
162 static int   user_set_flag  = 0;
163
164 static int _checkProgress = 1;             /* check deadlock */
165 static int _detected_hang = 0;
166
167 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
168
169 // dynamic SMSG
170 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
171
172 static int avg_smsg_connection = 32;
173 static int                 *smsg_connected_flag= 0;
174 static gni_smsg_attr_t     **smsg_attr_vector_local;
175 static gni_smsg_attr_t     **smsg_attr_vector_remote;
176 static gni_ep_handle_t     ep_hndl_unbound;
177 static gni_smsg_attr_t     send_smsg_attr;
178 static gni_smsg_attr_t     recv_smsg_attr;
179
180 typedef struct _dynamic_smsg_mailbox{
181    void     *mailbox_base;
182    int      size;
183    int      offset;
184    gni_mem_handle_t  mem_hndl;
185    struct      _dynamic_smsg_mailbox  *next;
186 }dynamic_smsg_mailbox_t;
187
188 static dynamic_smsg_mailbox_t  *mailbox_list;
189
190 int         rdma_id = 0;
191
192 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
193 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
194
195 #if PRINT_SYH
196 int         lrts_send_msg_id = 0;
197 int         lrts_local_done_msg = 0;
198 int         lrts_send_rdma_success = 0;
199 #endif
200
201 #include "machine.h"
202
203 #include "pcqueue.h"
204
205 #include "mempool.h"
206
207 #if CMK_PERSISTENT_COMM
208 #include "machine-persistent.h"
209 #endif
210
211 //#define  USE_ONESIDED 1
212 #ifdef USE_ONESIDED
213 //onesided implementation is wrong, since no place to restore omdh
214 #include "onesided.h"
215 onesided_hnd_t   onesided_hnd;
216 onesided_md_t    omdh;
217 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
218
219 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
220
221 #else
222 uint8_t   onesided_hnd, omdh;
223 #if REMOTE_EVENT
224 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
225          status = GNI_RC_ERROR_NOMEM;} \
226         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
227                 if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
228 #else
229 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
230     do {   \
231         if (register_memory_size + size >= MAX_REG_MEM) { \
232             status = GNI_RC_ERROR_NOMEM; \
233         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
234             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
235     } while(0)
236 #endif
237 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
238     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
239              register_memory_size -= size; \
240          else CmiAbort("MEM_DEregister");  \
241     } while (0)
242 #endif
243
244 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
245 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
246 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
247 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
248 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
249 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
250 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
251 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
252 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
253 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
254 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
255 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
256 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
257 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
258
259 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
260 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
261
262 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
263 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
264 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
265 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
266
267 #define ALIGNBUF                64
268
269 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
270 /* If SMSG is not used */
271
272 #define FMA_PER_CORE  1024
273 #define FMA_BUFFER_SIZE 1024
274
275 /* If SMSG is used */
276 static int  SMSG_MAX_MSG = 1024;
277 #define SMSG_MAX_CREDIT 72 
278
279 #define MSGQ_MAXSIZE       2048
280 /* large message transfer with FMA or BTE */
281 #define LRTS_GNI_RDMA_THRESHOLD  1024 
282
283 #if CMK_SMP
284 static int  REMOTE_QUEUE_ENTRIES=163840; 
285 static int LOCAL_QUEUE_ENTRIES=163840; 
286 #else
287 static int  REMOTE_QUEUE_ENTRIES=20480;
288 static int LOCAL_QUEUE_ENTRIES=20480; 
289 #endif
290
291 #define BIG_MSG_TAG             0x26
292 #define PUT_DONE_TAG            0x28
293 #define DIRECT_PUT_DONE_TAG     0x29
294 #define ACK_TAG                 0x30
295 /* SMSG is data message */
296 #define SMALL_DATA_TAG          0x31
297 #define SMALL_DATA_ACK_TAG      0x32
298 /* SMSG is a control message to initialize a BTE */
299 #define LMSG_INIT_TAG           0x39 
300
301 #define DEBUG
302 #ifdef GNI_RC_CHECK
303 #undef GNI_RC_CHECK
304 #endif
305 #ifdef DEBUG
306 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
307 #else
308 #define GNI_RC_CHECK(msg,rc)
309 #endif
310
311 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
312 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
313 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
314
315 static int useStaticMSGQ = 0;
316 static int useStaticFMA = 0;
317 static int mysize, myrank;
318 gni_nic_handle_t      nic_hndl;
319
320 typedef struct {
321     gni_mem_handle_t mdh;
322     uint64_t addr;
323 } mdh_addr_t ;
324 // this is related to dynamic SMSG
325
326 typedef struct mdh_addr_list{
327     gni_mem_handle_t mdh;
328    void *addr;
329     struct mdh_addr_list *next;
330 }mdh_addr_list_t;
331
332 static unsigned int         smsg_memlen;
333 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
334 mdh_addr_t          setup_mem;
335 mdh_addr_t          *smsg_connection_vec = 0;
336 gni_mem_handle_t    smsg_connection_memhndl;
337 static int          smsg_expand_slots = 10;
338 static int          smsg_available_slot = 0;
339 static void         *smsg_mailbox_mempool = 0;
340 mdh_addr_list_t     *smsg_dynamic_list = 0;
341
342 static void             *smsg_mailbox_base;
343 gni_msgq_attr_t         msgq_attrs;
344 gni_msgq_handle_t       msgq_handle;
345 gni_msgq_ep_attr_t      msgq_ep_attrs;
346 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
347
348 /* =====Beginning of Declarations of Machine Specific Variables===== */
349 static int cookie;
350 static int modes = 0;
351 static gni_cq_handle_t       smsg_rx_cqh = NULL;
352 static gni_cq_handle_t       smsg_tx_cqh = NULL;
353 static gni_cq_handle_t       post_rx_cqh = NULL;
354 static gni_cq_handle_t       post_tx_cqh = NULL;
355 static gni_ep_handle_t       *ep_hndl_array;
356 #if CMK_SMP && MULTI_THREAD_SEND
357 static CmiNodeLock           *ep_lock_array;
358 static CmiNodeLock           tx_cq_lock; 
359 static CmiNodeLock           rx_cq_lock;
360 static CmiNodeLock           *mempool_lock;
361 #endif
362
363 typedef struct msg_list
364 {
365     uint32_t destNode;
366     uint32_t size;
367     void *msg;
368     uint8_t tag;
369 #if !CMK_SMP
370     struct msg_list *next;
371 #endif
372 }MSG_LIST;
373
374
375 typedef struct control_msg
376 {
377     uint64_t            source_addr;    /* address from the start of buffer  */
378     uint64_t            dest_addr;      /* address from the start of buffer */
379     int                 total_length;   /* total length */
380     int                 length;         /* length of this packet */
381     uint8_t             seq_id;         //big message   0 meaning single message
382     gni_mem_handle_t    source_mem_hndl;
383     struct control_msg *next;
384 } CONTROL_MSG;
385
386 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
387
388 typedef struct ack_msg
389 {
390     uint64_t            source_addr;    /* address from the start of buffer  */
391 #if ! USE_LRTS_MEMPOOL
392     gni_mem_handle_t    source_mem_hndl;
393     int                 length;          /* total length */
394 #endif
395     struct ack_msg     *next;
396 } ACK_MSG;
397
398 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
399
400 #if CMK_DIRECT
401 typedef struct{
402     uint64_t    handler_addr;
403 }CMK_DIRECT_HEADER;
404
405 typedef struct {
406     char core[CmiMsgHeaderSizeBytes];
407     uint64_t handler;
408 }cmidirectMsg;
409
410 //SYH
411 CpvDeclare(int, CmiHandleDirectIdx);
412 void CmiHandleDirectMsg(cmidirectMsg* msg)
413 {
414
415     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
416    (*(_handle->callbackFnPtr))(_handle->callbackData);
417    CmiFree(msg);
418 }
419
420 void CmiDirectInit()
421 {
422     CpvInitialize(int,  CmiHandleDirectIdx);
423     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
424 }
425
426 #endif
427 typedef struct  rmda_msg
428 {
429     int                   destNode;
430     gni_post_descriptor_t *pd;
431 #if !CMK_SMP
432     struct  rmda_msg      *next;
433 #endif
434 }RDMA_REQUEST;
435
436
437 #if CMK_SMP
438 #define SMP_LOCKS               0
439 #define ONE_SEND_QUEUE                  0
440 PCQueue sendRdmaBuf;
441 typedef struct  msg_list_index
442 {
443     PCQueue     sendSmsgBuf;
444     int         pushed;
445     CmiNodeLock   lock;
446 } MSG_LIST_INDEX;
447 char                *destpe_avail;
448 #if  !ONE_SEND_QUEUE && SMP_LOCKS
449     PCQueue     nonEmptyQueues;
450 #endif
451 #else         /* non-smp */
452
453 static RDMA_REQUEST        *sendRdmaBuf = 0;
454 static RDMA_REQUEST        *sendRdmaTail = 0;
455 typedef struct  msg_list_index
456 {
457     int         next;
458     MSG_LIST    *sendSmsgBuf;
459     MSG_LIST    *tail;
460 } MSG_LIST_INDEX;
461
462 #endif
463
464 // buffered send queue
465 #if ! ONE_SEND_QUEUE
466 typedef struct smsg_queue
467 {
468     MSG_LIST_INDEX   *smsg_msglist_index;
469     int               smsg_head_index;
470 } SMSG_QUEUE;
471 #else
472 typedef struct smsg_queue
473 {
474     PCQueue       sendMsgBuf;
475 }  SMSG_QUEUE;
476 #endif
477
478 SMSG_QUEUE                  smsg_queue;
479 #if PIGGYBACK_ACK
480 SMSG_QUEUE                  smsg_ack_queue;
481 #endif
482 #if CMK_USE_OOB
483 SMSG_QUEUE                  smsg_oob_queue;
484 #endif
485
486 #if CMK_SMP
487
488 #define FreeMsgList(d)   free(d);
489 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
490
491 #else
492
493 static MSG_LIST       *msglist_freelist=0;
494
495 #define FreeMsgList(d)  \
496   do { \
497   (d)->next = msglist_freelist;\
498   msglist_freelist = d; \
499   } while (0)
500
501 #define MallocMsgList(d) \
502   do {  \
503   d = msglist_freelist;\
504   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
505              _MEMCHECK(d);\
506   } else msglist_freelist = d->next; \
507   d->next =0;  \
508   } while (0)
509
510 #endif
511
512 #if CMK_SMP
513
514 #define FreeControlMsg(d)      free(d);
515 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
516
517 #else
518
519 static CONTROL_MSG    *control_freelist=0;
520
521 #define FreeControlMsg(d)       \
522   do { \
523   (d)->next = control_freelist;\
524   control_freelist = d; \
525   } while (0);
526
527 #define MallocControlMsg(d) \
528   do {  \
529   d = control_freelist;\
530   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
531              _MEMCHECK(d);\
532   } else control_freelist = d->next;  \
533   } while (0);
534
535 #endif
536
537 #if CMK_SMP
538
539 #define FreeAckMsg(d)      free(d);
540 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
541
542 #else
543
544 static ACK_MSG        *ack_freelist=0;
545
546 #define FreeAckMsg(d)       \
547   do { \
548   (d)->next = ack_freelist;\
549   ack_freelist = d; \
550   } while (0)
551
552 #define MallocAckMsg(d) \
553   do { \
554   d = ack_freelist;\
555   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
556              _MEMCHECK(d);\
557   } else ack_freelist = d->next; \
558   } while (0)
559
560 #endif
561
562
563 # if CMK_SMP
564 #define FreeRdmaRequest(d)       free(d);
565 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
566 #else
567
568 static RDMA_REQUEST         *rdma_freelist = NULL;
569
570 #define FreeRdmaRequest(d)       \
571   do {  \
572   (d)->next = rdma_freelist;\
573   rdma_freelist = d;    \
574   } while (0)
575
576 #define MallocRdmaRequest(d) \
577   do {   \
578   d = rdma_freelist;\
579   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
580              _MEMCHECK(d);\
581   } else rdma_freelist = d->next; \
582   d->next =0;   \
583   } while (0)
584 #endif
585
586 /* reuse gni_post_descriptor_t */
587 static gni_post_descriptor_t *post_freelist=0;
588
589 #if  !CMK_SMP
590 #define FreePostDesc(d)       \
591     (d)->next_descr = post_freelist;\
592     post_freelist = d;
593
594 #define MallocPostDesc(d) \
595   d = post_freelist;\
596   if (d==0) { \
597      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
598      d->next_descr = 0;\
599       _MEMCHECK(d);\
600   } else post_freelist = d->next_descr;
601 #else
602
603 #define FreePostDesc(d)     free(d);
604 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
605
606 #endif
607
608
609 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
610 static int      buffered_smsg_counter = 0;
611
612 /* SmsgSend return success but message sent is not confirmed by remote side */
613 static MSG_LIST *buffered_fma_head = 0;
614 static MSG_LIST *buffered_fma_tail = 0;
615
616 /* functions  */
617 #define IsFree(a,ind)  !( a& (1<<(ind) ))
618 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
619 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
620
621 CpvDeclare(mempool_type*, mempool);
622
623 /* get the upper bound of log 2 */
624 int mylog2(int size)
625 {
626     int op = size;
627     unsigned int ret=0;
628     unsigned int mask = 0;
629     int i;
630     while(op>0)
631     {
632         op = op >> 1;
633         ret++;
634
635     }
636     for(i=1; i<ret; i++)
637     {
638         mask = mask << 1;
639         mask +=1;
640     }
641
642     ret -= ((size &mask) ? 0:1);
643     return ret;
644 }
645
646 static void
647 allgather(void *in,void *out, int len)
648 {
649     static int *ivec_ptr=NULL,already_called=0,job_size=0;
650     int i,rc;
651     int my_rank;
652     char *tmp_buf,*out_ptr;
653
654     if(!already_called) {
655
656         rc = PMI_Get_size(&job_size);
657         CmiAssert(rc == PMI_SUCCESS);
658         rc = PMI_Get_rank(&my_rank);
659         CmiAssert(rc == PMI_SUCCESS);
660
661         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
662         CmiAssert(ivec_ptr != NULL);
663
664         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
665         CmiAssert(rc == PMI_SUCCESS);
666
667         already_called = 1;
668
669     }
670
671     tmp_buf = (char *)malloc(job_size * len);
672     CmiAssert(tmp_buf);
673
674     rc = PMI_Allgather(in,tmp_buf,len);
675     CmiAssert(rc == PMI_SUCCESS);
676
677     out_ptr = out;
678
679     for(i=0;i<job_size;i++) {
680
681         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
682
683     }
684
685     free(tmp_buf);
686 }
687
688 static void
689 allgather_2(void *in,void *out, int len)
690 {
691     //PMI_Allgather is out of order
692     int i,rc, extend_len;
693     int  rank_index;
694     char *out_ptr, *out_ref;
695     char *in2;
696
697     extend_len = sizeof(int) + len;
698     in2 = (char*)malloc(extend_len);
699
700     memcpy(in2, &myrank, sizeof(int));
701     memcpy(in2+sizeof(int), in, len);
702
703     out_ptr = (char*)malloc(mysize*extend_len);
704
705     rc = PMI_Allgather(in2, out_ptr, extend_len);
706     GNI_RC_CHECK("allgather", rc);
707
708     out_ref = out;
709
710     for(i=0;i<mysize;i++) {
711         //rank index 
712         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
713         //copy to the rank index slot
714         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
715     }
716
717     free(out_ptr);
718     free(in2);
719
720 }
721
722 static unsigned int get_gni_nic_address(int device_id)
723 {
724     unsigned int address, cpu_id;
725     gni_return_t status;
726     int i, alps_dev_id=-1,alps_address=-1;
727     char *token, *p_ptr;
728
729     p_ptr = getenv("PMI_GNI_DEV_ID");
730     if (!p_ptr) {
731         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
732        
733         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
734     } else {
735         while ((token = strtok(p_ptr,":")) != NULL) {
736             alps_dev_id = atoi(token);
737             if (alps_dev_id == device_id) {
738                 break;
739             }
740             p_ptr = NULL;
741         }
742         CmiAssert(alps_dev_id != -1);
743         p_ptr = getenv("PMI_GNI_LOC_ADDR");
744         CmiAssert(p_ptr != NULL);
745         i = 0;
746         while ((token = strtok(p_ptr,":")) != NULL) {
747             if (i == alps_dev_id) {
748                 alps_address = atoi(token);
749                 break;
750             }
751             p_ptr = NULL;
752             ++i;
753         }
754         CmiAssert(alps_address != -1);
755         address = alps_address;
756     }
757     return address;
758 }
759
760 static uint8_t get_ptag(void)
761 {
762     char *p_ptr, *token;
763     uint8_t ptag;
764
765     p_ptr = getenv("PMI_GNI_PTAG");
766     CmiAssert(p_ptr != NULL);
767     token = strtok(p_ptr, ":");
768     ptag = (uint8_t)atoi(token);
769     return ptag;
770         
771 }
772
773 static uint32_t get_cookie(void)
774 {
775     uint32_t cookie;
776     char *p_ptr, *token;
777
778     p_ptr = getenv("PMI_GNI_COOKIE");
779     CmiAssert(p_ptr != NULL);
780     token = strtok(p_ptr, ":");
781     cookie = (uint32_t)atoi(token);
782
783     return cookie;
784 }
785
786 #if LARGEPAGE
787 #include <sys/stat.h>
788 #include <fcntl.h>
789 #include <sys/mman.h>
790
791 // size must be _tlbpagesize aligned
792 void *my_get_huge_pages(size_t size)
793 {
794     char filename[512];
795     int fd;
796     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
797     void *ptr = NULL;
798
799     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
800     fd = open(filename, O_RDWR | O_CREAT, mode);
801     if (fd == -1) {
802         CmiAbort("my_get_huge_pages: open filed");
803     }
804     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
805     if (ptr == MAP_FAILED) ptr = NULL;
806 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
807     close(fd);
808     unlink(filename);
809     return ptr;
810 }
811
812 void my_free_huge_pages(void *ptr, int size)
813 {
814 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
815     int ret = munmap(ptr, size);
816     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
817 }
818
819 #endif
820
821 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
822 /* TODO: add any that are related */
823 /* =====End of Definitions of Message-Corruption Related Macros=====*/
824
825
826 #include "machine-lrts.h"
827 #include "machine-common-core.c"
828
829 /* Network progress function is used to poll the network when for
830    messages. This flushes receive buffers on some  implementations*/
831 #if CMK_MACHINE_PROGRESS_DEFINED
832 void CmiMachineProgressImpl() {
833 }
834 #endif
835
836 static void SendRdmaMsg();
837 static void PumpNetworkSmsg();
838 static void PumpLocalRdmaTransactions();
839 static int SendBufferMsg(SMSG_QUEUE *queue);
840
841 #if MACHINE_DEBUG_LOG
842 FILE *debugLog = NULL;
843 static CmiInt8 buffered_recv_msg = 0;
844 int         lrts_smsg_success = 0;
845 int         lrts_received_msg = 0;
846 #endif
847
848 static void sweep_mempool(mempool_type *mptr)
849 {
850     int n = 0;
851     block_header *current = &(mptr->block_head);
852
853     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
854     while( current!= NULL) {
855         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
856         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
857     }
858     printf("[n %d] sweep_mempool slot END.\n", myrank);
859 }
860
861 inline
862 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
863 {
864     block_header *current = *from;
865
866     //while(register_memory_size>= MAX_REG_MEM)
867     //{
868         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
869             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
870
871         *from = current;
872         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
873         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
874         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
875     //}
876     return GNI_RC_SUCCESS;
877 }
878
879 inline 
880 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
881 {
882     gni_return_t status = GNI_RC_SUCCESS;
883     //int size = GetMempoolsize(msg);
884     //void *blockaddr = GetMempoolBlockPtr(msg);
885     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
886    
887     block_header *current = &(mptr->block_head);
888     while(register_memory_size>= MAX_REG_MEM)
889     {
890         status = deregisterMemory(mptr, &current);
891         if (status != GNI_RC_SUCCESS) break;
892     }
893     if(register_memory_size>= MAX_REG_MEM) return status;
894
895     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
896     while(1)
897     {
898         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
899         if(status == GNI_RC_SUCCESS)
900         {
901             break;
902         }
903         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
904         {
905             CmiAbort("Memory registor for mempool fails\n");
906         }
907         else
908         {
909             status = deregisterMemory(mptr, &current);
910             if (status != GNI_RC_SUCCESS) break;
911         }
912     }; 
913     return status;
914 }
915
916 inline 
917 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
918 {
919     static int rank = -1;
920     int i;
921     gni_return_t status;
922     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
923     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
924     mempool_type *mptr;
925
926     status = registerFromMempool(mptr1, msg, size, t);
927     if (status == GNI_RC_SUCCESS) return status;
928 #if CMK_SMP 
929     for (i=0; i<CmiMyNodeSize()+1; i++) {
930       rank = (rank+1)%(CmiMyNodeSize()+1);
931       mptr = CpvAccessOther(mempool, rank);
932       if (mptr == mptr1) continue;
933       status = registerFromMempool(mptr, msg, size, t);
934       if (status == GNI_RC_SUCCESS) return status;
935     }
936 #endif
937     return  GNI_RC_ERROR_RESOURCE;
938 }
939
940 inline
941 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
942 {
943     MSG_LIST        *msg_tmp;
944     MallocMsgList(msg_tmp);
945     msg_tmp->destNode = destNode;
946     msg_tmp->size   = size;
947     msg_tmp->msg    = msg;
948     msg_tmp->tag    = tag;
949
950 #if !CMK_SMP
951     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
952         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
953         queue->smsg_head_index = destNode;
954         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
955     }else
956     {
957         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
958         queue->smsg_msglist_index[destNode].tail = msg_tmp;
959     }
960 #else
961 #if ONE_SEND_QUEUE
962     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
963 #else
964 #if SMP_LOCKS
965     CmiLock(queue->smsg_msglist_index[destNode].lock);
966     if(queue->smsg_msglist_index[destNode].pushed == 0)
967     {
968         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
969     }
970     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
971     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
972 #else
973     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
974 #endif
975 #endif
976 #endif
977 #if PRINT_SYH
978     buffered_smsg_counter++;
979 #endif
980 }
981
982 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
983 {
984     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
985 }
986
987 inline
988 static void setup_smsg_connection(int destNode)
989 {
990     mdh_addr_list_t  *new_entry = 0;
991     gni_post_descriptor_t *pd;
992     gni_smsg_attr_t      *smsg_attr;
993     gni_return_t status = GNI_RC_NOT_DONE;
994     RDMA_REQUEST        *rdma_request_msg;
995     
996     if(smsg_available_slot == smsg_expand_slots)
997     {
998         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
999         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1000         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1001
1002         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1003             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1004             GNI_MEM_READWRITE,   
1005             -1,
1006             &(new_entry->mdh));
1007         smsg_available_slot = 0; 
1008         new_entry->next = smsg_dynamic_list;
1009         smsg_dynamic_list = new_entry;
1010     }
1011     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1012     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1013     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1014     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1015     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1016     smsg_attr->buff_size = smsg_memlen;
1017     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1018     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1019     smsg_local_attr_vec[destNode] = smsg_attr;
1020     smsg_available_slot++;
1021     MallocPostDesc(pd);
1022     pd->type            = GNI_POST_FMA_PUT;
1023     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1024     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1025     pd->length          = sizeof(gni_smsg_attr_t);
1026     pd->local_addr      = (uint64_t) smsg_attr;
1027     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1028     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1029     pd->src_cq_hndl     = 0;
1030     pd->rdma_mode       = 0;
1031     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1032     print_smsg_attr(smsg_attr);
1033     if(status == GNI_RC_ERROR_RESOURCE )
1034     {
1035         MallocRdmaRequest(rdma_request_msg);
1036         rdma_request_msg->destNode = destNode;
1037         rdma_request_msg->pd = pd;
1038         /* buffer this request */
1039     }
1040 #if PRINT_SYH
1041     if(status != GNI_RC_SUCCESS)
1042        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1043     else
1044         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1045 #endif
1046 }
1047
1048 /* useDynamicSMSG */
1049 inline 
1050 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1051 {
1052     gni_return_t status = GNI_RC_NOT_DONE;
1053
1054     if(mailbox_list->offset == mailbox_list->size)
1055     {
1056         dynamic_smsg_mailbox_t *new_mailbox_entry;
1057         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1058         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1059         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1060         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1061         new_mailbox_entry->offset = 0;
1062         
1063         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1064             new_mailbox_entry->size, smsg_rx_cqh,
1065             GNI_MEM_READWRITE,   
1066             -1,
1067             &(new_mailbox_entry->mem_hndl));
1068
1069         GNI_RC_CHECK("register", status);
1070         new_mailbox_entry->next = mailbox_list;
1071         mailbox_list = new_mailbox_entry;
1072     }
1073     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1074     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1075     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1076     local_smsg_attr->mbox_offset = mailbox_list->offset;
1077     mailbox_list->offset += smsg_memlen;
1078     local_smsg_attr->buff_size = smsg_memlen;
1079     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1080     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1081 }
1082
1083 /* useDynamicSMSG */
1084 inline 
1085 static int connect_to(int destNode)
1086 {
1087     gni_return_t status = GNI_RC_NOT_DONE;
1088     CmiAssert(smsg_connected_flag[destNode] == 0);
1089     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1090     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1091     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1092     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1093     
1094     CMI_GNI_LOCK
1095     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1096     CMI_GNI_UNLOCK
1097     if (status == GNI_RC_ERROR_RESOURCE) {
1098       /* possibly destNode is making connection at the same time */
1099       free(smsg_attr_vector_local[destNode]);
1100       smsg_attr_vector_local[destNode] = NULL;
1101       free(smsg_attr_vector_remote[destNode]);
1102       smsg_attr_vector_remote[destNode] = NULL;
1103       mailbox_list->offset -= smsg_memlen;
1104       return 0;
1105     }
1106     GNI_RC_CHECK("GNI_Post", status);
1107     smsg_connected_flag[destNode] = 1;
1108     return 1;
1109 }
1110
1111 #if PIGGYBACK_ACK
1112 static void * piggyback_ack(int destNode, int msgsize, int *count)
1113 {
1114     int i;
1115     if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
1116     int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1117     int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
1118     if (piggycount > len+1) piggycount = len + 1;
1119     if (piggycount <= 5) return NULL;
1120     uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
1121     CmiAssert(buf != NULL);
1122     buf[0] = piggycount-1;
1123 //printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
1124     for (i=0; i<piggycount-1; i++) {
1125         MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1126         CmiAssert(ptr != NULL);
1127         ACK_MSG *msg = ptr->msg;
1128         buf[i+1] = msg->source_addr;
1129         FreeAckMsg(msg);
1130         FreeMsgList(ptr);
1131     }
1132     *count = piggycount;
1133     return buf;
1134 }
1135
1136
1137 static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
1138 {
1139     if (!done)
1140     {
1141         int i;
1142         for (i=0; i<buf[0]; i++) {
1143             MSG_LIST *msg_tmp;
1144             MallocMsgList(msg_tmp);
1145             ACK_MSG  *ack_msg;
1146             MallocAckMsg(ack_msg);
1147             ack_msg->source_addr = buf[i+1];
1148             msg_tmp->size = ACK_MSG_SIZE;
1149             msg_tmp->msg = ack_msg;
1150             msg_tmp->tag = ACK_TAG;
1151             msg_tmp->destNode = destNode;
1152             PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1153         }
1154     }
1155     CmiTmpFree(buf);
1156 }
1157 #endif
1158
1159 inline 
1160 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1161 {
1162     unsigned int          remote_address;
1163     uint32_t              remote_id;
1164     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1165     gni_smsg_attr_t       *smsg_attr;
1166     gni_post_descriptor_t *pd;
1167     gni_post_state_t      post_state;
1168     char                  *real_data; 
1169
1170     if (useDynamicSMSG) {
1171         switch (smsg_connected_flag[destNode]) {
1172         case 0: 
1173             connect_to(destNode);         /* continue to case 1 */
1174         case 1:                           /* pending connection, do nothing */
1175             status = GNI_RC_NOT_DONE;
1176             if(inbuff ==0)
1177                 buffer_small_msgs(queue, msg, size, destNode, tag);
1178             return status;
1179         }
1180     }
1181 #if CMK_SMP
1182 #if ! ONE_SEND_QUEUE
1183     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1184 #endif
1185     {
1186 #else
1187     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1188     {
1189 #endif
1190         uint64_t *buf = NULL;
1191         int bufsize = 0;
1192 #if PIGGYBACK_ACK
1193         if (tag == SMALL_DATA_TAG) {
1194             int nack = 0;
1195             buf = piggyback_ack(destNode, size, &nack);
1196             if (buf) {
1197                 tag = SMALL_DATA_ACK_TAG;
1198                 bufsize = nack * sizeof(uint64_t);
1199             }
1200         }
1201 #endif
1202         CMI_GNI_LOCK
1203 #if CMK_SMP_TRACE_COMMTHREAD
1204         int oldpe = -1;
1205         int oldeventid = -1;
1206         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1207         { 
1208             START_EVENT();
1209             if ( tag == SMALL_DATA_TAG)
1210                 real_data = (char*)msg; 
1211             else 
1212                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1213             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1214             TRACE_COMM_SET_COMM_MSGID(real_data);
1215         }
1216 #endif
1217         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1218 #if CMK_SMP_TRACE_COMMTHREAD
1219         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1220 #endif
1221         CMI_GNI_UNLOCK
1222         if(status == GNI_RC_SUCCESS)
1223         {
1224 #if CMK_SMP_TRACE_COMMTHREAD
1225             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG)
1226             { 
1227                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1228             }
1229 #endif
1230             smsg_send_count ++;
1231         }else
1232             status = GNI_RC_ERROR_RESOURCE;
1233 #if PIGGYBACK_ACK
1234         if (buf) {
1235             piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
1236             tag = SMALL_DATA_TAG;
1237         }
1238 #endif
1239     }
1240     if(status != GNI_RC_SUCCESS && inbuff ==0)
1241         buffer_small_msgs(queue, msg, size, destNode, tag);
1242     return status;
1243 }
1244
1245 inline 
1246 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1247 {
1248     /* construct a control message and send */
1249     CONTROL_MSG         *control_msg_tmp;
1250     MallocControlMsg(control_msg_tmp);
1251     control_msg_tmp->source_addr = (uint64_t)msg;
1252     control_msg_tmp->seq_id    = seqno;
1253     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1254 #if     USE_LRTS_MEMPOOL
1255     if(size < BIG_MSG)
1256     {
1257         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1258     }
1259     else
1260     {
1261         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1262         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1263         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1264     }
1265 #else
1266     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1267 #endif
1268     return control_msg_tmp;
1269 }
1270
1271 #define BLOCKING_SEND_CONTROL    0
1272
1273 // Large message, send control to receiver, receiver register memory and do a GET, 
1274 // return 1 - send no success
1275 inline
1276 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1277 {
1278     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1279     uint32_t            vmdh_index  = -1;
1280     int                 size;
1281     int                 offset = 0;
1282     uint64_t            source_addr;
1283     int                 register_size; 
1284     void                *msg;
1285
1286     size    =   control_msg_tmp->total_length;
1287     source_addr = control_msg_tmp->source_addr;
1288     register_size = control_msg_tmp->length;
1289
1290 #if  USE_LRTS_MEMPOOL
1291     if( control_msg_tmp->seq_id == 0 ){
1292 #if BLOCKING_SEND_CONTROL
1293         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1294             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1295                 LrtsAdvanceCommunication(0);
1296         }
1297 #endif
1298         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1299         {
1300             msg = (void*)source_addr;
1301             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1302             {
1303                 if(!inbuff)
1304                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1305                 return GNI_RC_ERROR_NOMEM;
1306             }
1307             //register the corresponding mempool
1308             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1309             if(status == GNI_RC_SUCCESS)
1310             {
1311                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1312             }
1313         }else
1314         {
1315             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1316             status = GNI_RC_SUCCESS;
1317         }
1318         if(NoMsgInSend( control_msg_tmp->source_addr))
1319             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1320         else
1321             register_size = 0;
1322     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1323     {
1324         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1325         source_addr += offset;
1326         size = control_msg_tmp->length;
1327 #if BLOCKING_SEND_CONTROL
1328         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1329             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1330                 LrtsAdvanceCommunication(0);
1331         }
1332 #endif
1333         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1334             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1335             {
1336                 if(!inbuff)
1337                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1338                 return GNI_RC_ERROR_NOMEM;
1339             }
1340             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1341             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1342         }
1343         else
1344         {
1345             status = GNI_RC_SUCCESS;
1346         }
1347         register_size = 0;  
1348     }
1349
1350     if(status == GNI_RC_SUCCESS)
1351     {
1352         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1353         if(status == GNI_RC_SUCCESS)
1354         {
1355             buffered_send_msg += register_size;
1356             if(control_msg_tmp->seq_id == 0)
1357             {
1358                 IncreaseMsgInSend(source_addr);
1359             }
1360             FreeControlMsg(control_msg_tmp);
1361             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1362         }else
1363             status = GNI_RC_ERROR_RESOURCE;
1364
1365     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1366     {
1367         CmiAbort("Memory registor for large msg\n");
1368     }else 
1369     {
1370         status = GNI_RC_ERROR_NOMEM; 
1371         if(!inbuff)
1372             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1373     }
1374     return status;
1375 #else
1376     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1377     if(status == GNI_RC_SUCCESS)
1378     {
1379         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1380         if(status == GNI_RC_SUCCESS)
1381         {
1382             FreeControlMsg(control_msg_tmp);
1383         }
1384     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1385     {
1386         CmiAbort("Memory registor for large msg\n");
1387     }else 
1388     {
1389         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1390     }
1391     return status;
1392 #endif
1393 }
1394
1395 inline void LrtsPrepareEnvelope(char *msg, int size)
1396 {
1397     CmiSetMsgSize(msg, size);
1398 }
1399
1400 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1401 {
1402     gni_return_t        status  =   GNI_RC_SUCCESS;
1403     uint8_t tag;
1404     CONTROL_MSG         *control_msg_tmp;
1405     int                 oob = ( mode & OUT_OF_BAND);
1406     SMSG_QUEUE          *queue;
1407
1408     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1409 #if CMK_USE_OOB
1410     queue = oob? &smsg_oob_queue : &smsg_queue;
1411 #else
1412     queue = &smsg_queue;
1413 #endif
1414
1415     LrtsPrepareEnvelope(msg, size);
1416
1417 #if PRINT_SYH
1418     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1419 #endif 
1420 #if CMK_SMP && COMM_THREAD_SEND
1421     if(size <= SMSG_MAX_MSG)
1422         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1423     else if (size < BIG_MSG) {
1424         control_msg_tmp =  construct_control_msg(size, msg, 0);
1425         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1426     }
1427     else {
1428           CmiSetMsgSeq(msg, 0);
1429           control_msg_tmp =  construct_control_msg(size, msg, 1);
1430           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1431     }
1432 #else   //non-smp, smp(worker sending)
1433     if(size <= SMSG_MAX_MSG)
1434     {
1435         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1436             CmiFree(msg);
1437     }
1438     else if (size < BIG_MSG) {
1439         control_msg_tmp =  construct_control_msg(size, msg, 0);
1440         send_large_messages(queue, destNode, control_msg_tmp, 0);
1441     }
1442     else {
1443 #if     USE_LRTS_MEMPOOL
1444         CmiSetMsgSeq(msg, 0);
1445         control_msg_tmp =  construct_control_msg(size, msg, 1);
1446         send_large_messages(queue, destNode, control_msg_tmp, 0);
1447 #else
1448         control_msg_tmp =  construct_control_msg(size, msg, 0);
1449         send_large_messages(queue, destNode, control_msg_tmp, 0);
1450 #endif
1451     }
1452 #endif
1453     return 0;
1454 }
1455
1456 static void    PumpDatagramConnection();
1457 static void registerUserTraceEvents() {
1458 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1459     traceRegisterUserEvent("setting up connections", 10);
1460     traceRegisterUserEvent("Receiving small msgs", 20);
1461     traceRegisterUserEvent("Release local transaction", 30);
1462     traceRegisterUserEvent("Sending buffered small msgs", 40);
1463     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1464 #endif
1465 }
1466
1467 static void ProcessDeadlock()
1468 {
1469     static CmiUInt8 *ptr = NULL;
1470     static CmiUInt8  last = 0, mysum, sum;
1471     static int count = 0;
1472     gni_return_t status;
1473     int i;
1474
1475 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1476 //sweep_mempool(CpvAccess(mempool));
1477     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1478     mysum = smsg_send_count + smsg_recv_count;
1479     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1480     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1481     GNI_RC_CHECK("PMI_Allgather", status);
1482     sum = 0;
1483     for (i=0; i<mysize; i++)  sum+= ptr[i];
1484     if (last == 0 || sum == last) 
1485         count++;
1486     else
1487         count = 0;
1488     last = sum;
1489     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1490     if (count == 2) { 
1491         /* detected twice, it is a real deadlock */
1492         if (myrank == 0)  {
1493             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1494             CmiAbort("Fatal> Deadlock detected.");
1495         }
1496
1497     }
1498     _detected_hang = 0;
1499 }
1500
1501 static void CheckProgress()
1502 {
1503     if (smsg_send_count == last_smsg_send_count &&
1504         smsg_recv_count == last_smsg_recv_count ) 
1505     {
1506         _detected_hang = 1;
1507 #if !CMK_SMP
1508         if (_detected_hang) ProcessDeadlock();
1509 #endif
1510
1511     }
1512     else {
1513         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1514         last_smsg_send_count = smsg_send_count;
1515         last_smsg_recv_count = smsg_recv_count;
1516         _detected_hang = 0;
1517     }
1518 }
1519
1520 static void set_limit()
1521 {
1522     //if (!user_set_flag && CmiMyRank() == 0) {
1523     if (CmiMyRank() == 0) {
1524         int mynode = CmiPhysicalNodeID(CmiMyPe());
1525         int numpes = CmiNumPesOnPhysicalNode(mynode);
1526         int numprocesses = numpes / CmiMyNodeSize();
1527         MAX_REG_MEM  = _totalmem / numprocesses;
1528         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1529         if (CmiMyPe() == 0)
1530            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1531     }
1532 }
1533
1534 void LrtsPostCommonInit(int everReturn)
1535 {
1536 #if CMK_DIRECT
1537     CmiDirectInit();
1538 #endif
1539 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1540     CpvInitialize(double, projTraceStart);
1541     /* only PE 0 needs to care about registration (to generate sts file). */
1542     if (CmiMyPe() == 0) {
1543         registerMachineUserEventsFunction(&registerUserTraceEvents);
1544     }
1545 #endif
1546
1547 #if CMK_SMP
1548     CmiIdleState *s=CmiNotifyGetState();
1549     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1550     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1551 #else
1552     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1553     if (useDynamicSMSG)
1554     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1555 #endif
1556
1557     if (_checkProgress)
1558 #if CMK_SMP
1559     if (CmiMyRank() == 0)
1560 #endif
1561     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1562 #if !LARGEPAGE
1563     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1564 #endif
1565 }
1566
1567 /* this is called by worker thread */
1568 void LrtsPostNonLocal(){
1569 #if CMK_SMP
1570 #if MULTI_THREAD_SEND
1571     if(mysize == 1) return;
1572     PumpLocalRdmaTransactions();
1573 #if CMK_USE_OOB
1574     if (SendBufferMsg(&smsg_oob_queue) == 1)
1575 #endif
1576     SendBufferMsg(&smsg_queue);
1577 #if PIGGYBACK_ACK
1578     SendBufferMsg(&smsg_ack_queue);
1579 #endif
1580     SendRdmaMsg();
1581 #endif
1582 #endif
1583 }
1584
1585 /* useDynamicSMSG */
1586 static void    PumpDatagramConnection()
1587 {
1588     uint32_t          remote_address;
1589     uint32_t          remote_id;
1590     gni_return_t status;
1591     gni_post_state_t  post_state;
1592     uint64_t          datagram_id;
1593     int i;
1594
1595    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1596    {
1597        if (datagram_id >= mysize) {           /* bound endpoint */
1598            int pe = datagram_id - mysize;
1599            CMI_GNI_LOCK
1600            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1601            CMI_GNI_UNLOCK
1602            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1603            {
1604                CmiAssert(remote_id == pe);
1605                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1606                GNI_RC_CHECK("Dynamic SMSG Init", status);
1607 #if PRINT_SYH
1608                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1609 #endif
1610                CmiAssert(smsg_connected_flag[pe] == 1);
1611                smsg_connected_flag[pe] = 2;
1612            }
1613        }
1614        else {         /* unbound ep */
1615            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1616            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1617            {
1618                CmiAssert(remote_id<mysize);
1619                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1620                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1621                GNI_RC_CHECK("Dynamic SMSG Init", status);
1622 #if PRINT_SYH
1623                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1624 #endif
1625                smsg_connected_flag[remote_id] = 2;
1626
1627                alloc_smsg_attr(&send_smsg_attr);
1628                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1629                GNI_RC_CHECK("post unbound datagram", status);
1630            }
1631        }
1632    }
1633 }
1634
1635 /* pooling CQ to receive network message */
1636 static void PumpNetworkRdmaMsgs()
1637 {
1638     gni_cq_entry_t      event_data;
1639     gni_return_t        status;
1640
1641 }
1642
1643 inline 
1644 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1645 {
1646     RDMA_REQUEST        *rdma_request_msg;
1647     MallocRdmaRequest(rdma_request_msg);
1648     rdma_request_msg->destNode = inst_id;
1649     rdma_request_msg->pd = pd;
1650 #if CMK_SMP
1651     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1652 #else
1653     if(sendRdmaBuf == 0)
1654     {
1655         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1656     }else{
1657         sendRdmaTail->next = rdma_request_msg;
1658         sendRdmaTail =  rdma_request_msg;
1659     }
1660 #endif
1661
1662 }
1663
1664 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1665 static void PumpNetworkSmsg()
1666 {
1667     uint64_t            inst_id;
1668     int                 ret;
1669     gni_cq_entry_t      event_data;
1670     gni_return_t        status, status2;
1671     void                *header;
1672     uint8_t             msg_tag;
1673     int                 msg_nbytes;
1674     void                *msg_data;
1675     gni_mem_handle_t    msg_mem_hndl;
1676     gni_smsg_attr_t     *smsg_attr;
1677     gni_smsg_attr_t     *remote_smsg_attr;
1678     int                 init_flag;
1679     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1680     uint64_t            source_addr;
1681     SMSG_QUEUE         *queue = &smsg_queue;
1682 #if     CMK_DIRECT
1683     cmidirectMsg        *direct_msg;
1684 #endif
1685     while(1)
1686     {
1687         CMI_GNI_LOCK
1688         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1689         CMI_GNI_UNLOCK
1690         if(status != GNI_RC_SUCCESS)
1691             break;
1692         inst_id = GNI_CQ_GET_INST_ID(event_data);
1693         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1694 #if PRINT_SYH
1695         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1696 #endif
1697         if (useDynamicSMSG) {
1698             /* subtle: smsg may come before connection is setup */
1699             while (smsg_connected_flag[inst_id] != 2) 
1700                PumpDatagramConnection();
1701         }
1702         msg_tag = GNI_SMSG_ANY_TAG;
1703         while(1) {
1704             CMI_GNI_LOCK
1705             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1706             CMI_GNI_UNLOCK
1707             if (status != GNI_RC_SUCCESS)
1708                 break;
1709 #if PRINT_SYH
1710             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1711 #endif
1712             /* copy msg out and then put into queue (small message) */
1713             switch (msg_tag) {
1714 #if PIGGYBACK_ACK
1715             case SMALL_DATA_ACK_TAG:
1716             {
1717                 int i;
1718                 uint64_t *buf = (uint64_t*)header;
1719                 int piggycount = buf[0];
1720 //printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
1721                 for (i=0; i<piggycount; i++) {
1722                     void *msg = (void*)(buf[i+1]);
1723                     CmiAssert(msg != NULL);
1724                     DecreaseMsgInSend(msg);
1725                     if(NoMsgInSend(msg))
1726                         buffered_send_msg -= GetMempoolsize(msg);
1727                     CmiFree(msg);
1728                 }
1729                 header = buf + piggycount + 1;
1730                 msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
1731             }
1732 #endif
1733             case SMALL_DATA_TAG:
1734             {
1735                 START_EVENT();
1736                 msg_nbytes = CmiGetMsgSize(header);
1737                 msg_data    = CmiAlloc(msg_nbytes);
1738                 memcpy(msg_data, (char*)header, msg_nbytes);
1739 #if CMK_SMP_TRACE_COMMTHREAD
1740                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1741 #endif
1742                 handleOneRecvedMsg(msg_nbytes, msg_data);
1743                 break;
1744             }
1745             case LMSG_INIT_TAG:
1746             {
1747                 getLargeMsgRequest(header, inst_id);
1748                 break;
1749             }
1750             case ACK_TAG:   //msg fit into mempool
1751             {
1752                 /* Get is done, release message . Now put is not used yet*/
1753                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1754 #if ! USE_LRTS_MEMPOOL
1755                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1756 #else
1757                 DecreaseMsgInSend(msg);
1758 #endif
1759                 if(NoMsgInSend(msg))
1760                     buffered_send_msg -= GetMempoolsize(msg);
1761                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1762                 CmiFree(msg);
1763                 break;
1764             }
1765             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1766             {
1767                 header_tmp = (CONTROL_MSG *) header;
1768                 void *msg = (void*)(header_tmp->source_addr);
1769                 int cur_seq = CmiGetMsgSeq(msg);
1770                 int offset = ONE_SEG*(cur_seq+1);
1771                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1772                 buffered_send_msg -= header_tmp->length;
1773                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1774                 if (remain_size < 0) remain_size = 0;
1775                 CmiSetMsgSize(msg, remain_size);
1776                 if(remain_size <= 0) //transaction done
1777                 {
1778                     CmiFree(msg);
1779                 }else if (header_tmp->total_length > offset)
1780                 {
1781                     CmiSetMsgSeq(msg, cur_seq+1);
1782                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1783                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1784                     //send next seg
1785                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1786                          // pipelining
1787                     if (header_tmp->seq_id == 1) {
1788                       int i;
1789                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1790                         int seq = cur_seq+i+2;
1791                         CmiSetMsgSeq(msg, seq-1);
1792                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1793                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1794                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1795                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1796                       }
1797                     }
1798                 }
1799                 break;
1800             }
1801 #if CMK_PERSISTENT_COMM
1802             case PUT_DONE_TAG: //persistent message
1803                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1804                 int size = ((CONTROL_MSG *) header)->length;
1805                 CmiReference(msg);
1806                 handleOneRecvedMsg(size, msg); 
1807                 break;
1808 #endif
1809 #if CMK_DIRECT
1810             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1811                 //create a trigger message
1812                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1813                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1814                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1815                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1816                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1817                 break;
1818 #endif
1819             default: {
1820                 printf("weird tag problem\n");
1821                 CmiAbort("Unknown tag\n");
1822                      }
1823             }               // end switch
1824 #if PRINT_SYH
1825             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1826 #endif
1827             CMI_GNI_LOCK
1828             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1829             CMI_GNI_UNLOCK
1830             smsg_recv_count ++;
1831             msg_tag = GNI_SMSG_ANY_TAG;
1832         } //endwhile getNext
1833     }   //end while GetEvent
1834     if(status == GNI_RC_ERROR_RESOURCE)
1835     {
1836         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1837         GNI_RC_CHECK("Smsg_rx_cq full", status);
1838     }
1839 }
1840
1841 static void printDesc(gni_post_descriptor_t *pd)
1842 {
1843     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1844 }
1845
1846 // for BIG_MSG called on receiver side for receiving control message
1847 // LMSG_INIT_TAG
1848 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1849 {
1850 #if     USE_LRTS_MEMPOOL
1851     CONTROL_MSG         *request_msg;
1852     gni_return_t        status = GNI_RC_SUCCESS;
1853     void                *msg_data;
1854     gni_post_descriptor_t *pd;
1855     gni_mem_handle_t    msg_mem_hndl;
1856     int source, size, transaction_size, offset = 0;
1857     size_t     register_size = 0;
1858
1859     // initial a get to transfer data from the sender side */
1860     request_msg = (CONTROL_MSG *) header;
1861     size = request_msg->total_length;
1862     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1863     if(request_msg->seq_id < 2)   {
1864         msg_data = CmiAlloc(size);
1865         CmiSetMsgSeq(msg_data, 0);
1866         _MEMCHECK(msg_data);
1867     }
1868     else {
1869         offset = ONE_SEG*(request_msg->seq_id-1);
1870         msg_data = (char*)request_msg->dest_addr + offset;
1871     }
1872    
1873     MallocPostDesc(pd);
1874     pd->cqwrite_value = request_msg->seq_id;
1875     if( request_msg->seq_id == 0)
1876     {
1877         pd->local_mem_hndl= GetMemHndl(msg_data);
1878         transaction_size = ALIGN64(size);
1879         if(IsMemHndlZero(pd->local_mem_hndl))
1880         {   
1881             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1882             if(status == GNI_RC_SUCCESS)
1883             {
1884                 pd->local_mem_hndl = GetMemHndl(msg_data);
1885             }
1886             else
1887             {
1888                 SetMemHndlZero(pd->local_mem_hndl);
1889             }
1890         }
1891         if(NoMsgInRecv( (void*)(msg_data)))
1892             register_size = GetMempoolsize((void*)(msg_data));
1893         else
1894             register_size = 0;
1895     }
1896     else{
1897         transaction_size = ALIGN64(request_msg->length);
1898         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1899         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1900         {
1901             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1902         }
1903     }
1904     pd->first_operand = ALIGN64(size);                   //  total length
1905
1906     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1907         pd->type            = GNI_POST_FMA_GET;
1908     else
1909         pd->type            = GNI_POST_RDMA_GET;
1910 #if REMOTE_EVENT
1911     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1912 #else
1913     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1914 #endif
1915     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1916     pd->length          = transaction_size;
1917     pd->local_addr      = (uint64_t) msg_data;
1918     pd->remote_addr     = request_msg->source_addr + offset;
1919     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1920     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1921     pd->rdma_mode       = 0;
1922     pd->amo_cmd         = 0;
1923
1924     //memory registration success
1925     if(status == GNI_RC_SUCCESS)
1926     {
1927         CMI_GNI_LOCK
1928         if(pd->type == GNI_POST_RDMA_GET) 
1929             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1930         else
1931             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1932         CMI_GNI_UNLOCK
1933
1934         if(status == GNI_RC_SUCCESS )
1935         {
1936             if(pd->cqwrite_value == 0)
1937             {
1938 #if MACHINE_DEBUG_LOG
1939                 buffered_recv_msg += register_size;
1940                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1941 #endif
1942                 IncreaseMsgInRecv(msg_data);
1943             }
1944         }
1945     }else
1946     {
1947         SetMemHndlZero((pd->local_mem_hndl));
1948     }
1949     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1950     {
1951         bufferRdmaMsg(inst_id, pd); 
1952     }else {
1953          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1954         GNI_RC_CHECK("GetLargeAFter posting", status);
1955     }
1956 #else
1957     CONTROL_MSG         *request_msg;
1958     gni_return_t        status;
1959     void                *msg_data;
1960     gni_post_descriptor_t *pd;
1961     RDMA_REQUEST        *rdma_request_msg;
1962     gni_mem_handle_t    msg_mem_hndl;
1963     //int source;
1964     // initial a get to transfer data from the sender side */
1965     request_msg = (CONTROL_MSG *) header;
1966     msg_data = CmiAlloc(request_msg->length);
1967     _MEMCHECK(msg_data);
1968
1969     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1970
1971     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1972     {
1973         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1974     }
1975
1976     MallocPostDesc(pd);
1977     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1978         pd->type            = GNI_POST_FMA_GET;
1979     else
1980         pd->type            = GNI_POST_RDMA_GET;
1981 #if REMOTE_EVENT
1982     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1983 #else
1984     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1985 #endif
1986     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1987     pd->length          = ALIGN64(request_msg->length);
1988     pd->local_addr      = (uint64_t) msg_data;
1989     pd->remote_addr     = request_msg->source_addr;
1990     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1991     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1992     pd->rdma_mode       = 0;
1993     pd->amo_cmd         = 0;
1994
1995     //memory registration successful
1996     if(status == GNI_RC_SUCCESS)
1997     {
1998         pd->local_mem_hndl  = msg_mem_hndl;
1999         CMI_GNI_LOCK
2000         if(pd->type == GNI_POST_RDMA_GET) 
2001             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2002         else
2003             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2004         CMI_GNI_UNLOCK
2005     }else
2006     {
2007         SetMemHndlZero(pd->local_mem_hndl);
2008     }
2009     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2010     {
2011         MallocRdmaRequest(rdma_request_msg);
2012         rdma_request_msg->next = 0;
2013         rdma_request_msg->destNode = inst_id;
2014         rdma_request_msg->pd = pd;
2015         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2016     }else {
2017         GNI_RC_CHECK("AFter posting", status);
2018     }
2019 #endif
2020 }
2021
2022 static void PumpLocalRdmaTransactions()
2023 {
2024     gni_cq_entry_t          ev;
2025     gni_return_t            status;
2026     uint64_t                type, inst_id;
2027     gni_post_descriptor_t   *tmp_pd;
2028     MSG_LIST                *ptr;
2029     CONTROL_MSG             *ack_msg_tmp;
2030     ACK_MSG                 *ack_msg;
2031     uint8_t                 msg_tag;
2032 #if CMK_DIRECT
2033     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2034 #endif
2035     SMSG_QUEUE         *queue = &smsg_queue;
2036
2037     while(1) {
2038         CMI_GNI_LOCK 
2039         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
2040         CMI_GNI_UNLOCK
2041         if(status != GNI_RC_SUCCESS) break;
2042         
2043         type = GNI_CQ_GET_TYPE(ev);
2044         if (type == GNI_CQ_EVENT_TYPE_POST)
2045         {
2046             inst_id     = GNI_CQ_GET_INST_ID(ev);
2047 #if PRINT_SYH
2048             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2049 #endif
2050             CMI_GNI_LOCK
2051             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
2052             CMI_GNI_UNLOCK
2053
2054             switch (tmp_pd->type) {
2055 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2056             case GNI_POST_RDMA_PUT:
2057 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2058                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2059 #endif
2060             case GNI_POST_FMA_PUT:
2061                 if(tmp_pd->amo_cmd == 1) {
2062                     //sender ACK to receiver to trigger it is done
2063                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2064                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2065                     msg_tag = DIRECT_PUT_DONE_TAG;
2066                 }
2067                 else {
2068                     CmiFree((void *)tmp_pd->local_addr);
2069                     MallocControlMsg(ack_msg_tmp);
2070                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2071                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2072                     msg_tag = PUT_DONE_TAG;
2073                 }
2074                 break;
2075 #endif
2076             case GNI_POST_RDMA_GET:
2077             case GNI_POST_FMA_GET:  {
2078 #if  ! USE_LRTS_MEMPOOL
2079                 MallocControlMsg(ack_msg_tmp);
2080                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2081                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2082                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2083                 msg_tag = ACK_TAG;  
2084 #else
2085                 int seq_id = tmp_pd->cqwrite_value;
2086                 if(seq_id > 0)      // BIG_MSG
2087                 {
2088                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2089                     MallocControlMsg(ack_msg_tmp);
2090                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2091                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2092                     ack_msg_tmp->seq_id = seq_id;
2093                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2094                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2095                     ack_msg_tmp->length = tmp_pd->length;
2096                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2097                     msg_tag = BIG_MSG_TAG; 
2098                 } 
2099                 else
2100                 {
2101                     MallocAckMsg(ack_msg);
2102                     ack_msg->source_addr = tmp_pd->remote_addr;
2103                     msg_tag = ACK_TAG;  
2104                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
2105                 }
2106 #endif
2107                 break;
2108             }
2109             default:
2110                 CmiPrintf("type=%d\n", tmp_pd->type);
2111                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
2112             }      /* end of switch */
2113
2114 #if CMK_DIRECT
2115             if (tmp_pd->amo_cmd == 1) {
2116                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2117                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2118             }
2119             else
2120 #endif
2121             if (msg_tag == ACK_TAG) {
2122 #if ! PIGGYBACK_ACK
2123                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2124                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2125 #else
2126                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
2127 #endif
2128             }
2129             else {
2130                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2131                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2132             }
2133 #if CMK_PERSISTENT_COMM
2134             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2135 #endif
2136             {
2137                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2138 #if PRINT_SYH
2139                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2140 #endif
2141                     START_EVENT();
2142                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2143                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2144 #if MACHINE_DEBUG_LOG
2145                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2146                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2147                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2148 #endif
2149 #if CMK_SMP_TRACE_COMMTHREAD
2150                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2151 #endif
2152                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2153                 }else if(msg_tag == BIG_MSG_TAG){
2154                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2155                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2156                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2157                         START_EVENT();
2158 #if PRINT_SYH
2159                         printf("Pipeline msg done [%d]\n", myrank);
2160 #endif
2161 #if CMK_SMP_TRACE_COMMTHREAD
2162                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2163 #endif
2164                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2165                     }
2166                 }
2167             }
2168             FreePostDesc(tmp_pd);
2169         }
2170     } //end while
2171     if(status == GNI_RC_ERROR_RESOURCE)
2172     {
2173         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2174         GNI_RC_CHECK("Smsg_tx_cq full", status);
2175     }
2176 }
2177
2178 static void  SendRdmaMsg()
2179 {
2180     gni_return_t            status = GNI_RC_SUCCESS;
2181     gni_mem_handle_t        msg_mem_hndl;
2182     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2183     RDMA_REQUEST            *pre = 0;
2184     uint64_t                register_size = 0;
2185     void                    *msg;
2186     int                     i;
2187 #if CMK_SMP
2188     int len = PCQueueLength(sendRdmaBuf);
2189     for (i=0; i<len; i++)
2190     {
2191         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2192         if (ptr == NULL) break;
2193 #else
2194     ptr = sendRdmaBuf;
2195     while (ptr!=0)
2196     {
2197 #endif 
2198         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2199         gni_post_descriptor_t *pd = ptr->pd;
2200         status = GNI_RC_SUCCESS;
2201         
2202         if(pd->cqwrite_value == 0)
2203         {
2204             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2205             {
2206                 msg = (void*)(pd->local_addr);
2207                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2208                 if(status == GNI_RC_SUCCESS)
2209                 {
2210                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2211                 }
2212             }else
2213             {
2214                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2215             }
2216             if(NoMsgInRecv( (void*)(pd->local_addr)))
2217                 register_size = GetMempoolsize((void*)(pd->local_addr));
2218             else
2219                 register_size = 0;
2220         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2221         {
2222             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2223         }
2224         if(status == GNI_RC_SUCCESS)        //mem register good
2225         {
2226             CMI_GNI_LOCK
2227             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2228                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2229             else
2230                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2231             CMI_GNI_UNLOCK
2232             if(status == GNI_RC_SUCCESS)    //post good
2233             {
2234 #if !CMK_SMP
2235                 tmp_ptr = ptr;
2236                 if(pre != 0) {
2237                     pre->next = ptr->next;
2238                 }
2239                 else {
2240                     sendRdmaBuf = ptr->next;
2241                 }
2242                 ptr = ptr->next;
2243                 FreeRdmaRequest(tmp_ptr);
2244 #endif
2245                 if(pd->cqwrite_value == 0)
2246                 {
2247                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2248                 }
2249 #if MACHINE_DEBUG_LOG
2250                 buffered_recv_msg += register_size;
2251                 MACHSTATE(8, "GO request from buffered\n"); 
2252 #endif
2253             }else           // cannot post
2254             {
2255 #if CMK_SMP
2256                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2257 #else
2258                 pre = ptr;
2259                 ptr = ptr->next;
2260 #endif
2261                 break;
2262             }
2263         } else          //memory registration fails
2264         {
2265 #if CMK_SMP
2266             PCQueuePush(sendRdmaBuf, (char*)ptr);
2267 #else
2268             pre = ptr;
2269             ptr = ptr->next;
2270 #endif
2271         }
2272     } //end while
2273 #if ! CMK_SMP
2274     if(ptr == 0)
2275         sendRdmaTail = pre;
2276 #endif
2277 }
2278
2279 // return 1 if all messages are sent
2280 static int SendBufferMsg(SMSG_QUEUE *queue)
2281 {
2282     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2283     CONTROL_MSG         *control_msg_tmp;
2284     gni_return_t        status;
2285     int                 done = 1;
2286     uint64_t            register_size;
2287     void                *register_addr;
2288     int                 index_previous = -1;
2289 #if CMI_EXERT_SEND_CAP
2290     int                 sent_cnt = 0;
2291 #endif
2292
2293 #if CMK_SMP
2294     int          index = 0;
2295 #if ONE_SEND_QUEUE
2296     memset(destpe_avail, 0, mysize * sizeof(char));
2297     for (index=0; index<1; index++)
2298     {
2299         int i, len = PCQueueLength(queue->sendMsgBuf);
2300         for (i=0; i<len; i++) 
2301         {
2302             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2303             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2304                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2305                 continue;
2306             }
2307 #else
2308 #if SMP_LOCKS
2309     int nonempty = PCQueueLength(nonEmptyQueues);
2310     for(index =0; index<nonempty; index++)
2311     {
2312         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2313         PCQueue current_queue= current_list-> sendSmsgBuf;
2314         CmiLock(current_list->lock);
2315         int i, len = PCQueueLength(current_queue);
2316         current_list->pushed = 0;
2317         CmiUnlock(current_list->lock);
2318 #else
2319     for(index =0; index<mysize; index++)
2320     {
2321         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2322         int i, len = PCQueueLength(current_queue);
2323 #endif
2324         for (i=0; i<len; i++) 
2325         {
2326             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2327             if (ptr == 0) break;
2328 #endif
2329 #else
2330     int index = queue->smsg_head_index;
2331     while(index != -1)
2332     {
2333         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2334         pre = 0;
2335         while(ptr != 0)
2336         {
2337 #endif
2338             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2339             status = GNI_RC_ERROR_RESOURCE;
2340             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2341                 /* connection not exists yet */
2342             }
2343             else
2344             switch(ptr->tag)
2345             {
2346             case SMALL_DATA_TAG:
2347                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2348                 if(status == GNI_RC_SUCCESS)
2349                 {
2350                     CmiFree(ptr->msg);
2351                 }
2352                 break;
2353             case LMSG_INIT_TAG:
2354                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2355                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2356                 break;
2357             case ACK_TAG:
2358                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2359                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2360                 break;
2361             case BIG_MSG_TAG:
2362                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2363                 if(status == GNI_RC_SUCCESS)
2364                 {
2365                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2366                 }
2367                 break;
2368 #if CMK_DIRECT
2369             case DIRECT_PUT_DONE_TAG:
2370                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2371                 if(status == GNI_RC_SUCCESS)
2372                 {
2373                     free((CMK_DIRECT_HEADER*)ptr->msg);
2374                 }
2375                 break;
2376
2377 #endif
2378             default:
2379                 printf("Weird tag\n");
2380                 CmiAbort("should not happen\n");
2381             }       // end switch
2382             if(status == GNI_RC_SUCCESS)
2383             {
2384 #if !CMK_SMP
2385                 tmp_ptr = ptr;
2386                 if(pre)
2387                 {
2388                     ptr = pre ->next = ptr->next;
2389                 }else
2390                 {
2391                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2392                 }
2393                 FreeMsgList(tmp_ptr);
2394 #else
2395                 FreeMsgList(ptr);
2396 #endif
2397 #if PRINT_SYH
2398                 buffered_smsg_counter--;
2399                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2400 #endif
2401 #if CMI_EXERT_SEND_CAP
2402                 sent_cnt++;
2403                 if(sent_cnt == SEND_CAP)
2404                     break;
2405 #endif
2406             }else {
2407 #if CMK_SMP
2408 #if ONE_SEND_QUEUE
2409                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2410 #else
2411                 PCQueuePush(current_queue, (char*)ptr);
2412 #endif
2413 #else
2414                 pre = ptr;
2415                 ptr=ptr->next;
2416 #endif
2417                 done = 0;
2418                 if(status == GNI_RC_ERROR_RESOURCE)
2419                 {
2420 #if CMK_SMP && ONE_SEND_QUEUE 
2421                     destpe_avail[ptr->destNode] = 1;
2422 #else
2423                     break;
2424 #endif
2425                 }
2426             } 
2427         } //end while
2428 #if !CMK_SMP
2429         if(ptr == 0)
2430             queue->smsg_msglist_index[index].tail = pre;
2431         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2432         {
2433             if(index_previous != -1)
2434                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2435             else
2436                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2437         }
2438         else {
2439             index_previous = index;
2440         }
2441         index = queue->smsg_msglist_index[index].next;
2442 #else
2443 #if !ONE_SEND_QUEUE && SMP_LOCKS
2444         CmiLock(current_list->lock);
2445         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2446         {
2447             current_list->pushed = 1;
2448             PCQueuePush(nonEmptyQueues, current_list);
2449         }
2450         CmiUnlock(current_list->lock); 
2451 #endif
2452 #endif
2453
2454 #if CMI_EXERT_SEND_CAP
2455         if(sent_cnt == SEND_CAP)
2456                 break;
2457 #endif
2458     }   // end pooling for all cores
2459     return done;
2460 }
2461
2462 static void ProcessDeadlock();
2463 void LrtsAdvanceCommunication(int whileidle)
2464 {
2465     static int count = 0;
2466     /*  Receive Msg first */
2467 #if CMK_SMP_TRACE_COMMTHREAD
2468     double startT, endT;
2469 #endif
2470     if (useDynamicSMSG && whileidle)
2471     {
2472 #if CMK_SMP_TRACE_COMMTHREAD
2473         startT = CmiWallTimer();
2474 #endif
2475         PumpDatagramConnection();
2476 #if CMK_SMP_TRACE_COMMTHREAD
2477         endT = CmiWallTimer();
2478         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2479 #endif
2480     }
2481
2482 #if CMK_SMP_TRACE_COMMTHREAD
2483     startT = CmiWallTimer();
2484 #endif
2485     PumpNetworkSmsg();
2486     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2487 #if CMK_SMP_TRACE_COMMTHREAD
2488     endT = CmiWallTimer();
2489     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2490 #endif
2491
2492 #if CMK_SMP_TRACE_COMMTHREAD
2493     startT = CmiWallTimer();
2494 #endif
2495     PumpLocalRdmaTransactions();
2496     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2497 #if CMK_SMP_TRACE_COMMTHREAD
2498     endT = CmiWallTimer();
2499     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2500 #endif
2501     /* Send buffered Message */
2502 #if CMK_SMP_TRACE_COMMTHREAD
2503     startT = CmiWallTimer();
2504 #endif
2505 #if CMK_USE_OOB
2506     if (SendBufferMsg(&smsg_oob_queue) == 1)
2507 #endif
2508     {
2509 #if PIGGYBACK_ACK
2510     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
2511     if (SendBufferMsg(&smsg_queue) == 1) {
2512         //if (count++ % 10 == 0) 
2513         SendBufferMsg(&smsg_ack_queue);
2514     }
2515 #else
2516     SendBufferMsg(&smsg_queue);
2517 #endif
2518     }
2519     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2520 #if CMK_SMP_TRACE_COMMTHREAD
2521     endT = CmiWallTimer();
2522     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2523 #endif
2524
2525 #if CMK_SMP_TRACE_COMMTHREAD
2526     startT = CmiWallTimer();
2527 #endif
2528     SendRdmaMsg();
2529     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2530 #if CMK_SMP_TRACE_COMMTHREAD
2531     endT = CmiWallTimer();
2532     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2533 #endif
2534
2535 #if CMK_SMP
2536     if (_detected_hang)  ProcessDeadlock();
2537 #endif
2538 }
2539
2540 /* useDynamicSMSG */
2541 static void _init_dynamic_smsg()
2542 {
2543     gni_return_t status;
2544     uint32_t     vmdh_index = -1;
2545     int i;
2546
2547     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2548     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2549     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2550     for(i=0; i<mysize; i++) {
2551         smsg_connected_flag[i] = 0;
2552         smsg_attr_vector_local[i] = NULL;
2553         smsg_attr_vector_remote[i] = NULL;
2554     }
2555     if(mysize <=512)
2556     {
2557         SMSG_MAX_MSG = 4096;
2558     }else if (mysize <= 4096)
2559     {
2560         SMSG_MAX_MSG = 4096/mysize * 1024;
2561     }else if (mysize <= 16384)
2562     {
2563         SMSG_MAX_MSG = 512;
2564     }else {
2565         SMSG_MAX_MSG = 256;
2566     }
2567
2568     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2569     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2570     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2571     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2572     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2573
2574     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2575     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2576     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2577     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2578     mailbox_list->offset = 0;
2579     mailbox_list->next = 0;
2580     
2581     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2582         mailbox_list->size, smsg_rx_cqh,
2583         GNI_MEM_READWRITE,   
2584         vmdh_index,
2585         &(mailbox_list->mem_hndl));
2586     GNI_RC_CHECK("MEMORY registration for smsg", status);
2587
2588     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2589     GNI_RC_CHECK("Unbound EP", status);
2590     
2591     alloc_smsg_attr(&send_smsg_attr);
2592
2593     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2594     GNI_RC_CHECK("post unbound datagram", status);
2595
2596       /* always pre-connect to proc 0 */
2597     //if (myrank != 0) connect_to(0);
2598 }
2599
2600 static void _init_static_smsg()
2601 {
2602     gni_smsg_attr_t      *smsg_attr;
2603     gni_smsg_attr_t      remote_smsg_attr;
2604     gni_smsg_attr_t      *smsg_attr_vec;
2605     gni_mem_handle_t     my_smsg_mdh_mailbox;
2606     int      ret, i;
2607     gni_return_t status;
2608     uint32_t              vmdh_index = -1;
2609     mdh_addr_t            base_infor;
2610     mdh_addr_t            *base_addr_vec;
2611     char *env;
2612
2613     if(mysize <=512)
2614     {
2615         SMSG_MAX_MSG = 1024;
2616     }else if (mysize <= 4096)
2617     {
2618         SMSG_MAX_MSG = 1024;
2619     }else if (mysize <= 16384)
2620     {
2621         SMSG_MAX_MSG = 512;
2622     }else {
2623         SMSG_MAX_MSG = 256;
2624     }
2625     
2626     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2627     if (env) SMSG_MAX_MSG = atoi(env);
2628     CmiAssert(SMSG_MAX_MSG > 0);
2629
2630     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2631     
2632     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2633     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2634     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2635     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2636     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2637     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2638     CmiAssert(ret == 0);
2639     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2640     
2641     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2642             smsg_memlen*(mysize), smsg_rx_cqh,
2643             GNI_MEM_READWRITE,   
2644             vmdh_index,
2645             &my_smsg_mdh_mailbox);
2646     register_memory_size += smsg_memlen*(mysize);
2647     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2648
2649     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2650     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2651
2652     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2653     base_infor.mdh =  my_smsg_mdh_mailbox;
2654     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2655
2656     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2657  
2658     for(i=0; i<mysize; i++)
2659     {
2660         if(i==myrank)
2661             continue;
2662         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2663         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2664         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2665         smsg_attr[i].mbox_offset = i*smsg_memlen;
2666         smsg_attr[i].buff_size = smsg_memlen;
2667         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2668         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2669     }
2670
2671     for(i=0; i<mysize; i++)
2672     {
2673         if (myrank == i) continue;
2674
2675         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2676         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2677         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2678         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2679         remote_smsg_attr.buff_size = smsg_memlen;
2680         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2681         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2682
2683         /* initialize the smsg channel */
2684         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2685         GNI_RC_CHECK("SMSG Init", status);
2686     } //end initialization
2687
2688     free(base_addr_vec);
2689     free(smsg_attr);
2690
2691     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2692     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2693
2694
2695 inline
2696 static void _init_send_queue(SMSG_QUEUE *queue)
2697 {
2698      int i;
2699 #if ONE_SEND_QUEUE
2700      queue->sendMsgBuf = PCQueueCreate();
2701      destpe_avail = (char*)malloc(mysize * sizeof(char));
2702 #else
2703      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2704 #if CMK_SMP && SMP_LOCKS
2705      nonEmptyQueues = PCQueueCreate();
2706 #endif
2707      for(i =0; i<mysize; i++)
2708      {
2709 #if CMK_SMP
2710          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2711 #if SMP_LOCKS
2712          queue->smsg_msglist_index[i].pushed = 0;
2713          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2714 #endif
2715 #else
2716          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2717          queue->smsg_msglist_index[i].next = -1;
2718          queue->smsg_head_index = -1;
2719 #endif
2720         
2721      }
2722 #endif
2723 }
2724
2725 inline
2726 static void _init_smsg()
2727 {
2728     if(mysize > 1) {
2729         if (useDynamicSMSG)
2730             _init_dynamic_smsg();
2731         else
2732             _init_static_smsg();
2733     }
2734
2735     _init_send_queue(&smsg_queue);
2736 #if PIGGYBACK_ACK
2737     _init_send_queue(&smsg_ack_queue);
2738 #endif
2739 #if CMK_USE_OOB
2740     _init_send_queue(&smsg_oob_queue);
2741 #endif
2742 }
2743
2744 static void _init_static_msgq()
2745 {
2746     gni_return_t status;
2747     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2748     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2749     msgq_attrs.smsg_q_sz = 1;
2750     msgq_attrs.rcv_pool_sz = 1;
2751     msgq_attrs.num_msgq_eps = 2;
2752     msgq_attrs.nloc_insts = 8;
2753     msgq_attrs.modes = 0;
2754     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2755
2756     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2757     GNI_RC_CHECK("MSGQ Init", status);
2758
2759
2760 }
2761
2762 #if CMK_SMP && STEAL_MEMPOOL
2763 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2764 {
2765     void *pool = NULL;
2766     int i, k;
2767     // check other ranks
2768     for (k=0; k<CmiMyNodeSize()+1; k++) {
2769         i = (CmiMyRank()+k)%CmiMyNodeSize();
2770         if (i==CmiMyRank()) continue;
2771         mempool_type *mptr = CpvAccessOther(mempool, i);
2772         CmiLock(mptr->mempoolLock);
2773         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2774         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2775             CmiUnlock(mptr->mempoolLock);
2776             continue;
2777         }
2778         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2779         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2780             /* search in the free list */
2781           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2782           mempool_header *current = free_header;
2783           while (current) {
2784             if (current->next_free == (char*)header-(char*)mptr) break;
2785             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2786           }
2787           if (current == NULL) {         /* not found in free list */
2788             CmiUnlock(mptr->mempoolLock);
2789             continue;
2790           }
2791 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2792             /* search the previous memblock, and remove the tail */
2793           mempool_block *ptr = (mempool_block *)mptr;
2794           while (ptr) {
2795             if (ptr->memblock_next == mptr->memblock_tail) break;
2796             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2797           }
2798           CmiAssert(ptr!=NULL);
2799           ptr->memblock_next = 0;
2800           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2801
2802             /* remove memblock from the free list */
2803           current->next_free = header->next_free;
2804           if (header == free_header) mptr->freelist_head = header->next_free;
2805
2806           CmiUnlock(mptr->mempoolLock);
2807
2808           pool = (void*)tail;
2809           *mem_hndl = tail->mem_hndl;
2810           *size = tail->size;
2811           return pool;
2812         }
2813         CmiUnlock(mptr->mempoolLock);
2814     }
2815
2816       /* steal failed, deregister and free memblock now */
2817     int freed = 0;
2818     for (k=0; k<CmiMyNodeSize()+1; k++) {
2819         i = (CmiMyRank()+k)%CmiMyNodeSize();
2820         mempool_type *mptr = CpvAccessOther(mempool, i);
2821         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2822
2823         mempool_block *mempools_head = &(mptr->mempools_head);
2824         mempool_block *current = mempools_head;
2825         mempool_block *prev = NULL;
2826
2827         while (current) {
2828           int isfree = 0;
2829           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2830 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2831           mempool_header *cur = free_header;
2832           mempool_header *header;
2833           if (current != mempools_head) {
2834             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2835              /* search in free list */
2836             if (header->size == current->size - sizeof(mempool_block)) {
2837               cur = free_header;
2838               while (cur) {
2839                 if (cur->next_free == (char*)header-(char*)mptr) break;
2840                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2841               }
2842               if (cur != NULL) isfree = 1;
2843             }
2844           }
2845           if (isfree) {
2846               /* remove from free list */
2847             cur->next_free = header->next_free;
2848             if (header == free_header) mptr->freelist_head = header->next_free;
2849              // deregister
2850             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2851             GNI_RC_CHECK("steal Mempool de-register", status);
2852             mempool_block *ptr = current;
2853             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2854             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2855 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2856             freed += ptr->size;
2857             free(ptr);
2858              // try now
2859             if (freed > *size) {
2860               if (pool == NULL) {
2861                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2862                 CmiAssert(ret == 0);
2863               }
2864               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2865               if (status == GNI_RC_SUCCESS) {
2866                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2867 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2868                 return pool;
2869               }
2870 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2871             }
2872           }
2873           else {
2874              prev = current;
2875              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2876           }
2877         }
2878
2879         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2880     }
2881       /* still no luck registering pool */
2882     if (pool) free(pool);
2883     return NULL;
2884 }
2885 #endif
2886
2887 static CmiUInt8 total_mempool_size = 0;
2888 static CmiUInt8 total_mempool_calls = 0;
2889
2890 #if USE_LRTS_MEMPOOL
2891 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2892 {
2893     void *pool;
2894     int ret;
2895     gni_return_t status = GNI_RC_SUCCESS;
2896
2897     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2898     if (*size < default_size) *size = default_size;
2899 #if LARGEPAGE
2900     // round up to be multiple of _tlbpagesize
2901     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
2902     *size = ALIGNHUGEPAGE(*size);
2903 #endif
2904     total_mempool_size += *size;
2905     total_mempool_calls += 1;
2906 #if   !LARGEPAGE
2907     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
2908     {
2909         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
2910         CmiAbort("alloc_mempool_block");
2911     }
2912 #endif
2913 #if LARGEPAGE
2914     pool = my_get_huge_pages(*size);
2915     ret = pool==NULL;
2916 #else
2917     ret = posix_memalign(&pool, ALIGNBUF, *size);
2918 #endif
2919     if (ret != 0) {
2920 #if CMK_SMP && STEAL_MEMPOOL
2921       pool = steal_mempool_block(size, mem_hndl);
2922       if (pool != NULL) return pool;
2923 #endif
2924       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2925       if (ret == ENOMEM)
2926         CmiAbort("alloc_mempool_block: out of memory.");
2927       else
2928         CmiAbort("alloc_mempool_block: posix_memalign failed");
2929     }
2930 #if LARGEPAGE
2931     CmiMemLock();
2932     register_count++;
2933     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
2934     CmiMemUnlock();
2935     if(status != GNI_RC_SUCCESS) {
2936         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
2937 sweep_mempool(CpvAccess(mempool));
2938     }
2939     GNI_RC_CHECK("MEMORY_REGISTER", status);
2940 #else
2941     SetMemHndlZero((*mem_hndl));
2942 #endif
2943     return pool;
2944 }
2945
2946 // ptr is a block head pointer
2947 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2948 {
2949     if(!(IsMemHndlZero(mem_hndl)))
2950     {
2951         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
2952     }
2953 #if LARGEPAGE
2954     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
2955 #else
2956     free(ptr);
2957 #endif
2958 }
2959 #endif
2960
2961 void LrtsPreCommonInit(int everReturn){
2962 #if USE_LRTS_MEMPOOL
2963     CpvInitialize(mempool_type*, mempool);
2964     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
2965     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2966 #endif
2967 }
2968
2969 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2970 {
2971     register int            i;
2972     int                     rc;
2973     int                     device_id = 0;
2974     unsigned int            remote_addr;
2975     gni_cdm_handle_t        cdm_hndl;
2976     gni_return_t            status = GNI_RC_SUCCESS;
2977     uint32_t                vmdh_index = -1;
2978     uint8_t                 ptag;
2979     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2980     int                     first_spawned;
2981     int                     physicalID;
2982     char                   *env;
2983
2984     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2985     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2986     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2987    
2988     status = PMI_Init(&first_spawned);
2989     GNI_RC_CHECK("PMI_Init", status);
2990
2991     status = PMI_Get_size(&mysize);
2992     GNI_RC_CHECK("PMI_Getsize", status);
2993
2994     status = PMI_Get_rank(&myrank);
2995     GNI_RC_CHECK("PMI_getrank", status);
2996
2997     //physicalID = CmiPhysicalNodeID(myrank);
2998     
2999     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3000
3001     *myNodeID = myrank;
3002     *numNodes = mysize;
3003   
3004 #if MULTI_THREAD_SEND
3005     /* Currently, we only consider the case that comm. thread will only recv msgs */
3006     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
3007 #endif
3008
3009     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3010     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3011     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3012
3013     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3014     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3015     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3016
3017     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3018     if (env) useDynamicSMSG = 1;
3019     if (!useDynamicSMSG)
3020       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3021     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3022     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3023     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3024     
3025     if(myrank == 0)
3026     {
3027         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3028         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3029     }
3030 #ifdef USE_ONESIDED
3031     onesided_init(NULL, &onesided_hnd);
3032
3033     // this is a GNI test, so use the libonesided bypass functionality
3034     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3035     local_addr = gniGetNicAddress();
3036 #else
3037     ptag = get_ptag();
3038     cookie = get_cookie();
3039 #if 0
3040     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3041 #endif
3042     //Create and attach to the communication  domain */
3043     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3044     GNI_RC_CHECK("GNI_CdmCreate", status);
3045     //* device id The device id is the minor number for the device
3046     //that is assigned to the device by the system when the device is created.
3047     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3048     //where X is the device number 0 default 
3049     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3050     GNI_RC_CHECK("GNI_CdmAttach", status);
3051     local_addr = get_gni_nic_address(0);
3052 #endif
3053     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3054     _MEMCHECK(MPID_UGNI_AllAddr);
3055     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3056     /* create the local completion queue */
3057     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3058     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
3059     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3060     
3061     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
3062     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
3063     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3064
3065     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3066     GNI_RC_CHECK("Create CQ (rx)", status);
3067     
3068     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
3069     //GNI_RC_CHECK("Create Post CQ (rx)", status);
3070     
3071     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3072     //GNI_RC_CHECK("Create BTE CQ", status);
3073
3074     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3075     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3076     _MEMCHECK(ep_hndl_array);
3077 #if CMK_SMP && !COMM_THREAD_SEND
3078     tx_cq_lock  = CmiCreateLock();
3079     rx_cq_lock  = CmiCreateLock();
3080 #endif
3081     for (i=0; i<mysize; i++) {
3082         if(i == myrank) continue;
3083         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
3084         GNI_RC_CHECK("GNI_EpCreate ", status);   
3085         remote_addr = MPID_UGNI_AllAddr[i];
3086         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3087         GNI_RC_CHECK("GNI_EpBind ", status);   
3088     }
3089
3090     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3091     _init_smsg();
3092     PMI_Barrier();
3093
3094 #if     USE_LRTS_MEMPOOL
3095     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3096     if (env) {
3097         _totalmem = CmiReadSize(env);
3098         if (myrank == 0)
3099             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3100     }
3101
3102     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3103     if (env) _mempool_size = CmiReadSize(env);
3104     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3105         _mempool_size = CmiReadSize(env);
3106
3107
3108     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3109     if (env) {
3110         MAX_REG_MEM = CmiReadSize(env);
3111         user_set_flag = 1;
3112     }
3113     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3114         MAX_REG_MEM = CmiReadSize(env);
3115         user_set_flag = 1;
3116     }
3117
3118     env = getenv("CHARM_UGNI_SEND_MAX");
3119     if (env) {
3120         MAX_BUFF_SEND = CmiReadSize(env);
3121         user_set_flag = 1;
3122     }
3123     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3124         MAX_BUFF_SEND = CmiReadSize(env);
3125         user_set_flag = 1;
3126     }
3127
3128     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3129     if (env) {
3130         _mempool_size_limit = CmiReadSize(env);
3131     }
3132
3133     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3134     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3135
3136     if (myrank==0) {
3137         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3138         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3139         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3140             /* memblock can expand to BIG_MSG * 2 size */
3141             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3142             CmiAbort("mempool maximum size is too small. \n");
3143         }
3144 #if CMK_SMP && MULTI_THREAD_SEND
3145         printf("Charm++> worker thread sending messages\n");
3146 #elif CMK_SMP && COMM_THREAD_SEND
3147         printf("Charm++> only comm thread send/recv messages\n");
3148 #endif
3149     }
3150
3151 #endif     /* end of USE_LRTS_MEMPOOL */
3152
3153     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3154     if (env) {
3155         BIG_MSG = CmiReadSize(env);
3156         if (BIG_MSG < ONE_SEG)
3157           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3158     }
3159     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3160     if (env) {
3161         BIG_MSG_PIPELINE = atoi(env);
3162     }
3163
3164     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3165     if (env) _checkProgress = 0;
3166     if (mysize == 1) _checkProgress = 0;
3167
3168     
3169     /*
3170     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3171     if (env) 
3172         _tlbpagesize = CmiReadSize(env);
3173     */
3174     /* real gethugepagesize() is only available when hugetlb module linked */
3175     _tlbpagesize = gethugepagesize();
3176     if (myrank == 0) {
3177         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3178     }
3179
3180 #if LARGEPAGE
3181     if (_tlbpagesize == 4096) {
3182         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3183     }
3184 #endif
3185
3186     /* init DMA buffer for medium message */
3187
3188     //_init_DMA_buffer();
3189     
3190     free(MPID_UGNI_AllAddr);
3191 #if CMK_SMP
3192     sendRdmaBuf = PCQueueCreate();
3193 #else
3194     sendRdmaBuf = 0;
3195 #endif
3196 #if MACHINE_DEBUG_LOG
3197     char ln[200];
3198     sprintf(ln,"debugLog.%d",myrank);
3199     debugLog=fopen(ln,"w");
3200 #endif
3201
3202 //    NTK_Init();
3203 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3204 }
3205
3206 void* LrtsAlloc(int n_bytes, int header)
3207 {
3208     void *ptr = NULL;
3209 #if 0
3210     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3211 #endif
3212     if(n_bytes <= SMSG_MAX_MSG)
3213     {
3214         int totalsize = n_bytes+header;
3215         ptr = malloc(totalsize);
3216     }
3217     else {
3218         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3219 #if     USE_LRTS_MEMPOOL
3220         n_bytes = ALIGN64(n_bytes);
3221         if(n_bytes < BIG_MSG)
3222         {
3223             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3224             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3225         }else 
3226         {
3227 #if LARGEPAGE
3228             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3229             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3230             char *res = my_get_huge_pages(n_bytes);
3231 #else
3232             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3233 #endif
3234             if (res) ptr = res + ALIGNBUF - header;
3235         }
3236 #else
3237         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3238         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3239         ptr = res + ALIGNBUF - header;
3240 #endif
3241     }
3242     return ptr;
3243 }
3244
3245 void  LrtsFree(void *msg)
3246 {
3247     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3248     if (size <= SMSG_MAX_MSG)
3249         free(msg);
3250     else {
3251         size = ALIGN64(size);
3252         if(size>=BIG_MSG)
3253         {
3254 #if LARGEPAGE
3255             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3256             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3257 #else
3258             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3259 #endif
3260         }
3261         else {
3262 #if    USE_LRTS_MEMPOOL
3263 #if CMK_SMP
3264             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3265 #else
3266             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3267 #endif
3268 #else
3269             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3270 #endif
3271         }
3272     }
3273 }
3274
3275 void LrtsExit()
3276 {
3277     /* free memory ? */
3278 #if USE_LRTS_MEMPOOL
3279     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3280     mempool_destroy(CpvAccess(mempool));
3281 #endif
3282     PMI_Finalize();
3283     exit(0);
3284 }
3285
3286 void LrtsDrainResources()
3287 {
3288     if(mysize == 1) return;
3289     while (
3290 #if CMK_USE_OOB
3291            !SendBufferMsg(&smsg_oob_queue) ||
3292 #endif
3293            !SendBufferMsg(&smsg_queue) 
3294 #if PIGGYBACK_ACK
3295         || !SendBufferMsg(&smsg_ack_queue)
3296 #endif
3297           )
3298     {
3299         if (useDynamicSMSG)
3300             PumpDatagramConnection();
3301         PumpNetworkSmsg();
3302         PumpLocalRdmaTransactions();
3303         SendRdmaMsg();
3304     }
3305     PMI_Barrier();
3306 }
3307
3308 void LrtsAbort(const char *message) {
3309     printf("CmiAbort is calling on PE:%d\n", myrank);
3310     CmiPrintStackTrace(0);
3311     PMI_Abort(-1, message);
3312 }
3313
3314 /**************************  TIMER FUNCTIONS **************************/
3315 #if CMK_TIMER_USE_SPECIAL
3316 /* MPI calls are not threadsafe, even the timer on some machines */
3317 static CmiNodeLock  timerLock = 0;
3318 static int _absoluteTime = 0;
3319 static int _is_global = 0;
3320 static struct timespec start_ts;
3321
3322 inline int CmiTimerIsSynchronized() {
3323     return 0;
3324 }
3325
3326 inline int CmiTimerAbsolute() {
3327     return _absoluteTime;
3328 }
3329
3330 double CmiStartTimer() {
3331     return 0.0;
3332 }
3333
3334 double CmiInitTime() {
3335     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3336 }
3337
3338 void CmiTimerInit(char **argv) {
3339     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3340     if (_absoluteTime && CmiMyPe() == 0)
3341         printf("Charm++> absolute  timer is used\n");
3342     
3343     _is_global = CmiTimerIsSynchronized();
3344
3345
3346     if (_is_global) {
3347         if (CmiMyRank() == 0) {
3348             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3349         }
3350     } else { /* we don't have a synchronous timer, set our own start time */
3351         CmiBarrier();
3352         CmiBarrier();
3353         CmiBarrier();
3354         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3355     }
3356     CmiNodeAllBarrier();          /* for smp */
3357 }
3358
3359 /**
3360  * Since the timerLock is never created, and is
3361  * always NULL, then all the if-condition inside
3362  * the timer functions could be disabled right
3363  * now in the case of SMP.
3364  */
3365 double CmiTimer(void) {
3366     struct timespec now_ts;
3367     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3368     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3369         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3370 }
3371
3372 double CmiWallTimer(void) {
3373     struct timespec now_ts;
3374     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3375     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3376         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3377 }
3378
3379 double CmiCpuTimer(void) {
3380     struct timespec now_ts;
3381     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3382     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3383         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3384 }
3385
3386 #endif
3387 /************Barrier Related Functions****************/
3388
3389 int CmiBarrier()
3390 {
3391     gni_return_t status;
3392
3393 #if CMK_SMP
3394     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3395     CmiNodeAllBarrier();
3396     if (CmiMyRank() == CmiMyNodeSize())
3397 #else
3398     if (CmiMyRank() == 0)
3399 #endif
3400     {
3401         /**
3402          *  The call of CmiBarrier is usually before the initialization
3403          *  of trace module of Charm++, therefore, the START_EVENT
3404          *  and END_EVENT are disabled here. -Chao Mei
3405          */
3406         /*START_EVENT();*/
3407         status = PMI_Barrier();
3408         GNI_RC_CHECK("PMI_Barrier", status);
3409         /*END_EVENT(10);*/
3410     }
3411     CmiNodeAllBarrier();
3412     return status;
3413
3414 }
3415 #if CMK_DIRECT
3416 #include "machine-cmidirect.c"
3417 #endif
3418 #if CMK_PERSISTENT_COMM
3419 #include "machine-persistent.c"
3420 #endif
3421
3422