6a83c15fdc541a6e1a1c6f839a42210c78862e26
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <gni_pub.h>
38 #include <pmi.h>
39
40 //#include <numatoolkit.h>
41
42 #include "converse.h"
43
44 #define     LARGEPAGE           0
45
46 #if LARGEPAGE
47 #include <hugetlbfs.h>
48 #endif
49
50 #if CMK_DIRECT
51 #include "cmidirect.h"
52 #endif
53 #define PRINT_SYH  0
54
55 #define USE_LRTS_MEMPOOL                  1
56
57 #define REMOTE_EVENT                      0
58
59 #define CMI_EXERT_SEND_CAP      0
60 #define CMI_EXERT_RECV_CAP      0
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 16
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70 #if CMK_SMP
71 #define COMM_THREAD_SEND 1
72 //#define MULTI_THREAD_SEND 1
73 #endif
74
75 #if CMK_SMP
76 #define PIGGYBACK_ACK                        0
77 #endif
78
79 // Trace communication thread
80 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
81 #define TRACE_THRESHOLD     0.00005
82 #define CMI_MPI_TRACE_MOREDETAILED 0
83 #undef CMI_MPI_TRACE_USEREVENTS
84 #define CMI_MPI_TRACE_USEREVENTS 1
85 #else
86 #undef CMK_SMP_TRACE_COMMTHREAD
87 #define CMK_SMP_TRACE_COMMTHREAD 0
88 #endif
89
90 #define CMK_TRACE_COMMOVERHEAD 0
91 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
92 #undef CMI_MPI_TRACE_USEREVENTS
93 #define CMI_MPI_TRACE_USEREVENTS 1
94 #else
95 #undef CMK_TRACE_COMMOVERHEAD
96 #define CMK_TRACE_COMMOVERHEAD 0
97 #endif
98
99 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
100 CpvStaticDeclare(double, projTraceStart);
101 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
102 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
103 #else
104 #define  START_EVENT()
105 #define  END_EVENT(x)
106 #endif
107
108 #if USE_LRTS_MEMPOOL
109
110 #define oneMB (1024ll*1024)
111 #define oneGB (1024ll*1024*1024)
112
113 static CmiInt8 _mempool_size = 8*oneMB;
114 static CmiInt8 _expand_mem =  4*oneMB;
115 static CmiInt8 _mempool_size_limit = 0;
116
117 static CmiInt8 _totalmem = 0.8*oneGB;
118
119 #if LARGEPAGE
120 static int BIG_MSG  =  16*oneMB;
121 static int ONE_SEG  =  4*oneMB;
122 #else
123 static int BIG_MSG  =  4*oneMB;
124 static int ONE_SEG  =  2*oneMB;
125 #endif
126 static int BIG_MSG_PIPELINE = 4;
127
128 // dynamic flow control
129 static CmiInt8 buffered_send_msg = 0;
130 static CmiInt8 register_memory_size = 0;
131
132 #if LARGEPAGE
133 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
134 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
135 static CmiInt8  register_count = 0;
136 #else
137 #if CMK_SMP && COMM_THREAD_SEND 
138 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
139 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
140 #else
141 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
142 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
143 #endif
144
145
146 #endif
147
148 #endif     /* end USE_LRTS_MEMPOOL */
149
150 #if CMK_SMP && MULTI_THREAD_SEND
151 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
152 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
153 #else
154 #define     CMI_GNI_LOCK
155 #define     CMI_GNI_UNLOCK
156 #endif
157
158 static int _tlbpagesize = 4096;
159
160 //static int _smpd_count  = 0;
161
162 static int   user_set_flag  = 0;
163
164 static int _checkProgress = 1;             /* check deadlock */
165 static int _detected_hang = 0;
166
167 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
168
169 // dynamic SMSG
170 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
171
172 static int avg_smsg_connection = 32;
173 static int                 *smsg_connected_flag= 0;
174 static gni_smsg_attr_t     **smsg_attr_vector_local;
175 static gni_smsg_attr_t     **smsg_attr_vector_remote;
176 static gni_ep_handle_t     ep_hndl_unbound;
177 static gni_smsg_attr_t     send_smsg_attr;
178 static gni_smsg_attr_t     recv_smsg_attr;
179
180 typedef struct _dynamic_smsg_mailbox{
181    void     *mailbox_base;
182    int      size;
183    int      offset;
184    gni_mem_handle_t  mem_hndl;
185    struct      _dynamic_smsg_mailbox  *next;
186 }dynamic_smsg_mailbox_t;
187
188 static dynamic_smsg_mailbox_t  *mailbox_list;
189
190 int         rdma_id = 0;
191
192 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
193 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
194
195 #if PRINT_SYH
196 int         lrts_send_msg_id = 0;
197 int         lrts_local_done_msg = 0;
198 int         lrts_send_rdma_success = 0;
199 #endif
200
201 #include "machine.h"
202
203 #include "pcqueue.h"
204
205 #include "mempool.h"
206
207 #if CMK_PERSISTENT_COMM
208 #include "machine-persistent.h"
209 #endif
210
211 //#define  USE_ONESIDED 1
212 #ifdef USE_ONESIDED
213 //onesided implementation is wrong, since no place to restore omdh
214 #include "onesided.h"
215 onesided_hnd_t   onesided_hnd;
216 onesided_md_t    omdh;
217 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
218
219 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
220
221 #else
222 uint8_t   onesided_hnd, omdh;
223 #if REMOTE_EVENT
224 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
225          status = GNI_RC_ERROR_NOMEM;} \
226         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
227                 if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
228 #else
229 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
230     do {   \
231         if (register_memory_size + size >= MAX_REG_MEM) { \
232             status = GNI_RC_ERROR_NOMEM; \
233         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
234             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
235     } while(0)
236 #endif
237 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
238     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
239              register_memory_size -= size; \
240          else CmiAbort("MEM_DEregister");  \
241     } while (0)
242 #endif
243
244 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
245 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
246 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
247 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
248 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
249 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
250 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
251 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
252 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
253 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
254 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
255 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
256 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
257 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
258
259 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
260 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
261
262 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
263 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
264 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
265 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
266
267 #define ALIGNBUF                64
268
269 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
270 /* If SMSG is not used */
271
272 #define FMA_PER_CORE  1024
273 #define FMA_BUFFER_SIZE 1024
274
275 /* If SMSG is used */
276 static int  SMSG_MAX_MSG = 1024;
277 #define SMSG_MAX_CREDIT 72 
278
279 #define MSGQ_MAXSIZE       2048
280 /* large message transfer with FMA or BTE */
281 #define LRTS_GNI_RDMA_THRESHOLD  1024 
282
283 #if CMK_SMP
284 static int  REMOTE_QUEUE_ENTRIES=163840; 
285 static int LOCAL_QUEUE_ENTRIES=163840; 
286 #else
287 static int  REMOTE_QUEUE_ENTRIES=20480;
288 static int LOCAL_QUEUE_ENTRIES=20480; 
289 #endif
290
291 #define BIG_MSG_TAG             0x26
292 #define PUT_DONE_TAG            0x28
293 #define DIRECT_PUT_DONE_TAG     0x29
294 #define ACK_TAG                 0x30
295 /* SMSG is data message */
296 #define SMALL_DATA_TAG          0x31
297 #define SMALL_DATA_ACK_TAG      0x32
298 /* SMSG is a control message to initialize a BTE */
299 #define LMSG_INIT_TAG           0x39 
300
301 #define DEBUG
302 #ifdef GNI_RC_CHECK
303 #undef GNI_RC_CHECK
304 #endif
305 #ifdef DEBUG
306 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
307 #else
308 #define GNI_RC_CHECK(msg,rc)
309 #endif
310
311 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
312 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
313 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
314
315 static int useStaticMSGQ = 0;
316 static int useStaticFMA = 0;
317 static int mysize, myrank;
318 gni_nic_handle_t      nic_hndl;
319
320 typedef struct {
321     gni_mem_handle_t mdh;
322     uint64_t addr;
323 } mdh_addr_t ;
324 // this is related to dynamic SMSG
325
326 typedef struct mdh_addr_list{
327     gni_mem_handle_t mdh;
328    void *addr;
329     struct mdh_addr_list *next;
330 }mdh_addr_list_t;
331
332 static unsigned int         smsg_memlen;
333 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
334 mdh_addr_t          setup_mem;
335 mdh_addr_t          *smsg_connection_vec = 0;
336 gni_mem_handle_t    smsg_connection_memhndl;
337 static int          smsg_expand_slots = 10;
338 static int          smsg_available_slot = 0;
339 static void         *smsg_mailbox_mempool = 0;
340 mdh_addr_list_t     *smsg_dynamic_list = 0;
341
342 static void             *smsg_mailbox_base;
343 gni_msgq_attr_t         msgq_attrs;
344 gni_msgq_handle_t       msgq_handle;
345 gni_msgq_ep_attr_t      msgq_ep_attrs;
346 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
347
348 /* =====Beginning of Declarations of Machine Specific Variables===== */
349 static int cookie;
350 static int modes = 0;
351 static gni_cq_handle_t       smsg_rx_cqh = NULL;
352 static gni_cq_handle_t       smsg_tx_cqh = NULL;
353 static gni_cq_handle_t       post_rx_cqh = NULL;
354 static gni_cq_handle_t       post_tx_cqh = NULL;
355 static gni_ep_handle_t       *ep_hndl_array;
356 #if CMK_SMP && MULTI_THREAD_SEND
357 static CmiNodeLock           *ep_lock_array;
358 static CmiNodeLock           tx_cq_lock; 
359 static CmiNodeLock           rx_cq_lock;
360 static CmiNodeLock           *mempool_lock;
361 #endif
362
363 typedef struct msg_list
364 {
365     uint32_t destNode;
366     uint32_t size;
367     void *msg;
368     uint8_t tag;
369 #if !CMK_SMP
370     struct msg_list *next;
371 #endif
372 }MSG_LIST;
373
374
375 typedef struct control_msg
376 {
377     uint64_t            source_addr;    /* address from the start of buffer  */
378     uint64_t            dest_addr;      /* address from the start of buffer */
379     int                 total_length;   /* total length */
380     int                 length;         /* length of this packet */
381     uint8_t             seq_id;         //big message   0 meaning single message
382     gni_mem_handle_t    source_mem_hndl;
383     struct control_msg *next;
384 } CONTROL_MSG;
385
386 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
387
388 typedef struct ack_msg
389 {
390     uint64_t            source_addr;    /* address from the start of buffer  */
391 #if ! USE_LRTS_MEMPOOL
392     gni_mem_handle_t    source_mem_hndl;
393     int                 length;          /* total length */
394 #endif
395     struct ack_msg     *next;
396 } ACK_MSG;
397
398 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
399
400 #if CMK_DIRECT
401 typedef struct{
402     uint64_t    handler_addr;
403 }CMK_DIRECT_HEADER;
404
405 typedef struct {
406     char core[CmiMsgHeaderSizeBytes];
407     uint64_t handler;
408 }cmidirectMsg;
409
410 //SYH
411 CpvDeclare(int, CmiHandleDirectIdx);
412 void CmiHandleDirectMsg(cmidirectMsg* msg)
413 {
414
415     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
416    (*(_handle->callbackFnPtr))(_handle->callbackData);
417    CmiFree(msg);
418 }
419
420 void CmiDirectInit()
421 {
422     CpvInitialize(int,  CmiHandleDirectIdx);
423     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
424 }
425
426 #endif
427 typedef struct  rmda_msg
428 {
429     int                   destNode;
430     gni_post_descriptor_t *pd;
431 #if !CMK_SMP
432     struct  rmda_msg      *next;
433 #endif
434 }RDMA_REQUEST;
435
436
437 #if CMK_SMP
438 #define SMP_LOCKS               0
439 #define ONE_SEND_QUEUE                  0
440 PCQueue sendRdmaBuf;
441 typedef struct  msg_list_index
442 {
443     PCQueue     sendSmsgBuf;
444     int         pushed;
445     CmiNodeLock   lock;
446 } MSG_LIST_INDEX;
447 char                *destpe_avail;
448 #if  !ONE_SEND_QUEUE && SMP_LOCKS
449     PCQueue     nonEmptyQueues;
450 #endif
451 #else         /* non-smp */
452
453 static RDMA_REQUEST        *sendRdmaBuf = 0;
454 static RDMA_REQUEST        *sendRdmaTail = 0;
455 typedef struct  msg_list_index
456 {
457     int         next;
458     MSG_LIST    *sendSmsgBuf;
459     MSG_LIST    *tail;
460 } MSG_LIST_INDEX;
461
462 #endif
463
464 // buffered send queue
465 #if ! ONE_SEND_QUEUE
466 typedef struct smsg_queue
467 {
468     MSG_LIST_INDEX   *smsg_msglist_index;
469     int               smsg_head_index;
470 } SMSG_QUEUE;
471 #else
472 typedef struct smsg_queue
473 {
474     PCQueue       sendMsgBuf;
475 }  SMSG_QUEUE;
476 #endif
477
478 SMSG_QUEUE                  smsg_queue;
479 #if PIGGYBACK_ACK
480 SMSG_QUEUE                  smsg_ack_queue;
481 #endif
482 #if CMK_USE_OOB
483 SMSG_QUEUE                  smsg_oob_queue;
484 #endif
485
486 #if CMK_SMP
487
488 #define FreeMsgList(d)   free(d);
489 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
490
491 #else
492
493 static MSG_LIST       *msglist_freelist=0;
494
495 #define FreeMsgList(d)  \
496   do { \
497   (d)->next = msglist_freelist;\
498   msglist_freelist = d; \
499   } while (0)
500
501 #define MallocMsgList(d) \
502   do {  \
503   d = msglist_freelist;\
504   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
505              _MEMCHECK(d);\
506   } else msglist_freelist = d->next; \
507   d->next =0;  \
508   } while (0)
509
510 #endif
511
512 #if CMK_SMP
513
514 #define FreeControlMsg(d)      free(d);
515 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
516
517 #else
518
519 static CONTROL_MSG    *control_freelist=0;
520
521 #define FreeControlMsg(d)       \
522   do { \
523   (d)->next = control_freelist;\
524   control_freelist = d; \
525   } while (0);
526
527 #define MallocControlMsg(d) \
528   do {  \
529   d = control_freelist;\
530   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
531              _MEMCHECK(d);\
532   } else control_freelist = d->next;  \
533   } while (0);
534
535 #endif
536
537 #if CMK_SMP
538
539 #define FreeAckMsg(d)      free(d);
540 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
541
542 #else
543
544 static ACK_MSG        *ack_freelist=0;
545
546 #define FreeAckMsg(d)       \
547   do { \
548   (d)->next = ack_freelist;\
549   ack_freelist = d; \
550   } while (0)
551
552 #define MallocAckMsg(d) \
553   do { \
554   d = ack_freelist;\
555   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
556              _MEMCHECK(d);\
557   } else ack_freelist = d->next; \
558   } while (0)
559
560 #endif
561
562
563 # if CMK_SMP
564 #define FreeRdmaRequest(d)       free(d);
565 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
566 #else
567
568 static RDMA_REQUEST         *rdma_freelist = NULL;
569
570 #define FreeRdmaRequest(d)       \
571   do {  \
572   (d)->next = rdma_freelist;\
573   rdma_freelist = d;    \
574   } while (0)
575
576 #define MallocRdmaRequest(d) \
577   do {   \
578   d = rdma_freelist;\
579   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
580              _MEMCHECK(d);\
581   } else rdma_freelist = d->next; \
582   d->next =0;   \
583   } while (0)
584 #endif
585
586 /* reuse gni_post_descriptor_t */
587 static gni_post_descriptor_t *post_freelist=0;
588
589 #if  !CMK_SMP
590 #define FreePostDesc(d)       \
591     (d)->next_descr = post_freelist;\
592     post_freelist = d;
593
594 #define MallocPostDesc(d) \
595   d = post_freelist;\
596   if (d==0) { \
597      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
598      d->next_descr = 0;\
599       _MEMCHECK(d);\
600   } else post_freelist = d->next_descr;
601 #else
602
603 #define FreePostDesc(d)     free(d);
604 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
605
606 #endif
607
608
609 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
610 static int      buffered_smsg_counter = 0;
611
612 /* SmsgSend return success but message sent is not confirmed by remote side */
613 static MSG_LIST *buffered_fma_head = 0;
614 static MSG_LIST *buffered_fma_tail = 0;
615
616 /* functions  */
617 #define IsFree(a,ind)  !( a& (1<<(ind) ))
618 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
619 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
620
621 CpvDeclare(mempool_type*, mempool);
622
623 /* get the upper bound of log 2 */
624 int mylog2(int size)
625 {
626     int op = size;
627     unsigned int ret=0;
628     unsigned int mask = 0;
629     int i;
630     while(op>0)
631     {
632         op = op >> 1;
633         ret++;
634
635     }
636     for(i=1; i<ret; i++)
637     {
638         mask = mask << 1;
639         mask +=1;
640     }
641
642     ret -= ((size &mask) ? 0:1);
643     return ret;
644 }
645
646 static void
647 allgather(void *in,void *out, int len)
648 {
649     static int *ivec_ptr=NULL,already_called=0,job_size=0;
650     int i,rc;
651     int my_rank;
652     char *tmp_buf,*out_ptr;
653
654     if(!already_called) {
655
656         rc = PMI_Get_size(&job_size);
657         CmiAssert(rc == PMI_SUCCESS);
658         rc = PMI_Get_rank(&my_rank);
659         CmiAssert(rc == PMI_SUCCESS);
660
661         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
662         CmiAssert(ivec_ptr != NULL);
663
664         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
665         CmiAssert(rc == PMI_SUCCESS);
666
667         already_called = 1;
668
669     }
670
671     tmp_buf = (char *)malloc(job_size * len);
672     CmiAssert(tmp_buf);
673
674     rc = PMI_Allgather(in,tmp_buf,len);
675     CmiAssert(rc == PMI_SUCCESS);
676
677     out_ptr = out;
678
679     for(i=0;i<job_size;i++) {
680
681         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
682
683     }
684
685     free(tmp_buf);
686 }
687
688 static void
689 allgather_2(void *in,void *out, int len)
690 {
691     //PMI_Allgather is out of order
692     int i,rc, extend_len;
693     int  rank_index;
694     char *out_ptr, *out_ref;
695     char *in2;
696
697     extend_len = sizeof(int) + len;
698     in2 = (char*)malloc(extend_len);
699
700     memcpy(in2, &myrank, sizeof(int));
701     memcpy(in2+sizeof(int), in, len);
702
703     out_ptr = (char*)malloc(mysize*extend_len);
704
705     rc = PMI_Allgather(in2, out_ptr, extend_len);
706     GNI_RC_CHECK("allgather", rc);
707
708     out_ref = out;
709
710     for(i=0;i<mysize;i++) {
711         //rank index 
712         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
713         //copy to the rank index slot
714         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
715     }
716
717     free(out_ptr);
718     free(in2);
719
720 }
721
722 static unsigned int get_gni_nic_address(int device_id)
723 {
724     unsigned int address, cpu_id;
725     gni_return_t status;
726     int i, alps_dev_id=-1,alps_address=-1;
727     char *token, *p_ptr;
728
729     p_ptr = getenv("PMI_GNI_DEV_ID");
730     if (!p_ptr) {
731         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
732        
733         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
734     } else {
735         while ((token = strtok(p_ptr,":")) != NULL) {
736             alps_dev_id = atoi(token);
737             if (alps_dev_id == device_id) {
738                 break;
739             }
740             p_ptr = NULL;
741         }
742         CmiAssert(alps_dev_id != -1);
743         p_ptr = getenv("PMI_GNI_LOC_ADDR");
744         CmiAssert(p_ptr != NULL);
745         i = 0;
746         while ((token = strtok(p_ptr,":")) != NULL) {
747             if (i == alps_dev_id) {
748                 alps_address = atoi(token);
749                 break;
750             }
751             p_ptr = NULL;
752             ++i;
753         }
754         CmiAssert(alps_address != -1);
755         address = alps_address;
756     }
757     return address;
758 }
759
760 static uint8_t get_ptag(void)
761 {
762     char *p_ptr, *token;
763     uint8_t ptag;
764
765     p_ptr = getenv("PMI_GNI_PTAG");
766     CmiAssert(p_ptr != NULL);
767     token = strtok(p_ptr, ":");
768     ptag = (uint8_t)atoi(token);
769     return ptag;
770         
771 }
772
773 static uint32_t get_cookie(void)
774 {
775     uint32_t cookie;
776     char *p_ptr, *token;
777
778     p_ptr = getenv("PMI_GNI_COOKIE");
779     CmiAssert(p_ptr != NULL);
780     token = strtok(p_ptr, ":");
781     cookie = (uint32_t)atoi(token);
782
783     return cookie;
784 }
785
786 #if LARGEPAGE
787 #include <sys/stat.h>
788 #include <fcntl.h>
789 #include <sys/mman.h>
790
791 // size must be _tlbpagesize aligned
792 void *my_get_huge_pages(size_t size)
793 {
794     char filename[512];
795     int fd;
796     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
797     void *ptr = NULL;
798
799     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
800     fd = open(filename, O_RDWR | O_CREAT, mode);
801     if (fd == -1) {
802         CmiAbort("my_get_huge_pages: open filed");
803     }
804     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
805     if (ptr == MAP_FAILED) ptr = NULL;
806 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
807     close(fd);
808     unlink(filename);
809     return ptr;
810 }
811
812 void my_free_huge_pages(void *ptr, int size)
813 {
814 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
815     int ret = munmap(ptr, size);
816     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
817 }
818
819 #endif
820
821 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
822 /* TODO: add any that are related */
823 /* =====End of Definitions of Message-Corruption Related Macros=====*/
824
825
826 #include "machine-lrts.h"
827 #include "machine-common-core.c"
828
829 /* Network progress function is used to poll the network when for
830    messages. This flushes receive buffers on some  implementations*/
831 #if CMK_MACHINE_PROGRESS_DEFINED
832 void CmiMachineProgressImpl() {
833 }
834 #endif
835
836 static void SendRdmaMsg();
837 static void PumpNetworkSmsg();
838 static void PumpLocalRdmaTransactions();
839 static int SendBufferMsg(SMSG_QUEUE *queue);
840
841 #if MACHINE_DEBUG_LOG
842 FILE *debugLog = NULL;
843 static CmiInt8 buffered_recv_msg = 0;
844 int         lrts_smsg_success = 0;
845 int         lrts_received_msg = 0;
846 #endif
847
848 static void sweep_mempool(mempool_type *mptr)
849 {
850     int n = 0;
851     block_header *current = &(mptr->block_head);
852
853     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
854     while( current!= NULL) {
855         printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
856         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
857     }
858     printf("[n %d] sweep_mempool slot END.\n", myrank);
859 }
860
861 inline
862 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
863 {
864     block_header *current = *from;
865
866     //while(register_memory_size>= MAX_REG_MEM)
867     //{
868         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
869             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
870
871         *from = current;
872         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
873         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
874         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
875     //}
876     return GNI_RC_SUCCESS;
877 }
878
879 inline 
880 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
881 {
882     gni_return_t status = GNI_RC_SUCCESS;
883     //int size = GetMempoolsize(msg);
884     //void *blockaddr = GetMempoolBlockPtr(msg);
885     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
886    
887     block_header *current = &(mptr->block_head);
888     while(register_memory_size>= MAX_REG_MEM)
889     {
890         status = deregisterMemory(mptr, &current);
891         if (status != GNI_RC_SUCCESS) break;
892     }
893     if(register_memory_size>= MAX_REG_MEM) return status;
894
895     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
896     while(1)
897     {
898         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
899         if(status == GNI_RC_SUCCESS)
900         {
901             break;
902         }
903         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
904         {
905             CmiAbort("Memory registor for mempool fails\n");
906         }
907         else
908         {
909             status = deregisterMemory(mptr, &current);
910             if (status != GNI_RC_SUCCESS) break;
911         }
912     }; 
913     return status;
914 }
915
916 inline 
917 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
918 {
919     static int rank = -1;
920     int i;
921     gni_return_t status;
922     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
923     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
924     mempool_type *mptr;
925
926     status = registerFromMempool(mptr1, msg, size, t);
927     if (status == GNI_RC_SUCCESS) return status;
928 #if CMK_SMP 
929     for (i=0; i<CmiMyNodeSize()+1; i++) {
930       rank = (rank+1)%(CmiMyNodeSize()+1);
931       mptr = CpvAccessOther(mempool, rank);
932       if (mptr == mptr1) continue;
933       status = registerFromMempool(mptr, msg, size, t);
934       if (status == GNI_RC_SUCCESS) return status;
935     }
936 #endif
937     return  GNI_RC_ERROR_RESOURCE;
938 }
939
940 inline
941 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
942 {
943     MSG_LIST        *msg_tmp;
944     MallocMsgList(msg_tmp);
945     msg_tmp->destNode = destNode;
946     msg_tmp->size   = size;
947     msg_tmp->msg    = msg;
948     msg_tmp->tag    = tag;
949
950 #if !CMK_SMP
951     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
952         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
953         queue->smsg_head_index = destNode;
954         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
955     }else
956     {
957         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
958         queue->smsg_msglist_index[destNode].tail = msg_tmp;
959     }
960 #else
961 #if ONE_SEND_QUEUE
962     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
963 #else
964 #if SMP_LOCKS
965     CmiLock(queue->smsg_msglist_index[destNode].lock);
966     if(queue->smsg_msglist_index[destNode].pushed == 0)
967     {
968         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
969     }
970     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
971     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
972 #else
973     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
974 #endif
975 #endif
976 #endif
977 #if PRINT_SYH
978     buffered_smsg_counter++;
979 #endif
980 }
981
982 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
983 {
984     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
985 }
986
987 inline
988 static void setup_smsg_connection(int destNode)
989 {
990     mdh_addr_list_t  *new_entry = 0;
991     gni_post_descriptor_t *pd;
992     gni_smsg_attr_t      *smsg_attr;
993     gni_return_t status = GNI_RC_NOT_DONE;
994     RDMA_REQUEST        *rdma_request_msg;
995     
996     if(smsg_available_slot == smsg_expand_slots)
997     {
998         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
999         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1000         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1001
1002         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1003             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1004             GNI_MEM_READWRITE,   
1005             -1,
1006             &(new_entry->mdh));
1007         smsg_available_slot = 0; 
1008         new_entry->next = smsg_dynamic_list;
1009         smsg_dynamic_list = new_entry;
1010     }
1011     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1012     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1013     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1014     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1015     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1016     smsg_attr->buff_size = smsg_memlen;
1017     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1018     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1019     smsg_local_attr_vec[destNode] = smsg_attr;
1020     smsg_available_slot++;
1021     MallocPostDesc(pd);
1022     pd->type            = GNI_POST_FMA_PUT;
1023     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1024     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1025     pd->length          = sizeof(gni_smsg_attr_t);
1026     pd->local_addr      = (uint64_t) smsg_attr;
1027     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1028     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1029     pd->src_cq_hndl     = 0;
1030     pd->rdma_mode       = 0;
1031     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1032     print_smsg_attr(smsg_attr);
1033     if(status == GNI_RC_ERROR_RESOURCE )
1034     {
1035         MallocRdmaRequest(rdma_request_msg);
1036         rdma_request_msg->destNode = destNode;
1037         rdma_request_msg->pd = pd;
1038         /* buffer this request */
1039     }
1040 #if PRINT_SYH
1041     if(status != GNI_RC_SUCCESS)
1042        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1043     else
1044         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1045 #endif
1046 }
1047
1048 /* useDynamicSMSG */
1049 inline 
1050 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1051 {
1052     gni_return_t status = GNI_RC_NOT_DONE;
1053
1054     if(mailbox_list->offset == mailbox_list->size)
1055     {
1056         dynamic_smsg_mailbox_t *new_mailbox_entry;
1057         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1058         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1059         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1060         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1061         new_mailbox_entry->offset = 0;
1062         
1063         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1064             new_mailbox_entry->size, smsg_rx_cqh,
1065             GNI_MEM_READWRITE,   
1066             -1,
1067             &(new_mailbox_entry->mem_hndl));
1068
1069         GNI_RC_CHECK("register", status);
1070         new_mailbox_entry->next = mailbox_list;
1071         mailbox_list = new_mailbox_entry;
1072     }
1073     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1074     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1075     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1076     local_smsg_attr->mbox_offset = mailbox_list->offset;
1077     mailbox_list->offset += smsg_memlen;
1078     local_smsg_attr->buff_size = smsg_memlen;
1079     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1080     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1081 }
1082
1083 /* useDynamicSMSG */
1084 inline 
1085 static int connect_to(int destNode)
1086 {
1087     gni_return_t status = GNI_RC_NOT_DONE;
1088     CmiAssert(smsg_connected_flag[destNode] == 0);
1089     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1090     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1091     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1092     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1093     
1094     CMI_GNI_LOCK
1095     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1096     CMI_GNI_UNLOCK
1097     if (status == GNI_RC_ERROR_RESOURCE) {
1098       /* possibly destNode is making connection at the same time */
1099       free(smsg_attr_vector_local[destNode]);
1100       smsg_attr_vector_local[destNode] = NULL;
1101       free(smsg_attr_vector_remote[destNode]);
1102       smsg_attr_vector_remote[destNode] = NULL;
1103       mailbox_list->offset -= smsg_memlen;
1104       return 0;
1105     }
1106     GNI_RC_CHECK("GNI_Post", status);
1107     smsg_connected_flag[destNode] = 1;
1108     return 1;
1109 }
1110
1111 #if PIGGYBACK_ACK
1112 static void * piggyback_ack(int destNode, int msgsize, int *count)
1113 {
1114     int i;
1115     if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
1116     int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1117     int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
1118     if (piggycount > len+1) piggycount = len + 1;
1119     if (piggycount <= 5) return NULL;
1120     uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
1121     CmiAssert(buf != NULL);
1122     buf[0] = piggycount-1;
1123 //printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
1124     for (i=0; i<piggycount-1; i++) {
1125         MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
1126         CmiAssert(ptr != NULL);
1127         ACK_MSG *msg = ptr->msg;
1128         buf[i+1] = msg->source_addr;
1129         FreeAckMsg(msg);
1130         FreeMsgList(ptr);
1131     }
1132     *count = piggycount;
1133     return buf;
1134 }
1135
1136
1137 static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
1138 {
1139     if (!done)
1140     {
1141         int i;
1142         for (i=0; i<buf[0]; i++) {
1143             MSG_LIST *msg_tmp;
1144             MallocMsgList(msg_tmp);
1145             ACK_MSG  *ack_msg;
1146             MallocAckMsg(ack_msg);
1147             ack_msg->source_addr = buf[i+1];
1148             msg_tmp->size = ACK_MSG_SIZE;
1149             msg_tmp->msg = ack_msg;
1150             msg_tmp->tag = ACK_TAG;
1151             msg_tmp->destNode = destNode;
1152             PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1153         }
1154     }
1155     CmiTmpFree(buf);
1156 }
1157 #endif
1158
1159 inline 
1160 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1161 {
1162     unsigned int          remote_address;
1163     uint32_t              remote_id;
1164     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1165     gni_smsg_attr_t       *smsg_attr;
1166     gni_post_descriptor_t *pd;
1167     gni_post_state_t      post_state;
1168     char                  *real_data; 
1169
1170     if (useDynamicSMSG) {
1171         switch (smsg_connected_flag[destNode]) {
1172         case 0: 
1173             connect_to(destNode);         /* continue to case 1 */
1174         case 1:                           /* pending connection, do nothing */
1175             status = GNI_RC_NOT_DONE;
1176             if(inbuff ==0)
1177                 buffer_small_msgs(queue, msg, size, destNode, tag);
1178             return status;
1179         }
1180     }
1181 #if CMK_SMP
1182 #if ! ONE_SEND_QUEUE
1183     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1184 #endif
1185     {
1186 #else
1187     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1188     {
1189 #endif
1190         uint64_t *buf = NULL;
1191         int bufsize = 0;
1192 #if PIGGYBACK_ACK
1193         if (tag == SMALL_DATA_TAG) {
1194             int nack = 0;
1195             buf = piggyback_ack(destNode, size, &nack);
1196             if (buf) {
1197                 tag = SMALL_DATA_ACK_TAG;
1198                 bufsize = nack * sizeof(uint64_t);
1199             }
1200         }
1201 #endif
1202         CMI_GNI_LOCK
1203 <<<<<<< HEAD
1204 #if CMK_SMP_TRACE_COMMTHREAD
1205         int oldpe = -1;
1206         int oldeventid = -1;
1207         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1208         { 
1209             START_EVENT();
1210             if ( tag == SMALL_DATA_TAG)
1211                 real_data = (char*)msg; 
1212             else 
1213                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1214             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1215             TRACE_COMM_SET_COMM_MSGID(real_data);
1216         }
1217 #endif
1218         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
1219 #if CMK_SMP_TRACE_COMMTHREAD
1220         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1221 #endif
1222 =======
1223         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1224 >>>>>>> 6bc5ef7da3aa42d00053cc7680a14257efe4ee42
1225         CMI_GNI_UNLOCK
1226         if(status == GNI_RC_SUCCESS)
1227         {
1228 #if CMK_SMP_TRACE_COMMTHREAD
1229             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG)
1230             { 
1231                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1232             }
1233 #endif
1234             smsg_send_count ++;
1235         }else
1236             status = GNI_RC_ERROR_RESOURCE;
1237 #if PIGGYBACK_ACK
1238         if (buf) {
1239             piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
1240             tag = SMALL_DATA_TAG;
1241         }
1242 #endif
1243     }
1244     if(status != GNI_RC_SUCCESS && inbuff ==0)
1245         buffer_small_msgs(queue, msg, size, destNode, tag);
1246     return status;
1247 }
1248
1249 inline 
1250 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1251 {
1252     /* construct a control message and send */
1253     CONTROL_MSG         *control_msg_tmp;
1254     MallocControlMsg(control_msg_tmp);
1255     control_msg_tmp->source_addr = (uint64_t)msg;
1256     control_msg_tmp->seq_id    = seqno;
1257     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1258 #if     USE_LRTS_MEMPOOL
1259     if(size < BIG_MSG)
1260     {
1261         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1262     }
1263     else
1264     {
1265         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1266         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1267         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1268     }
1269 #else
1270     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1271 #endif
1272     return control_msg_tmp;
1273 }
1274
1275 #define BLOCKING_SEND_CONTROL    0
1276
1277 // Large message, send control to receiver, receiver register memory and do a GET, 
1278 // return 1 - send no success
1279 inline
1280 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1281 {
1282     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1283     uint32_t            vmdh_index  = -1;
1284     int                 size;
1285     int                 offset = 0;
1286     uint64_t            source_addr;
1287     int                 register_size; 
1288     void                *msg;
1289
1290     size    =   control_msg_tmp->total_length;
1291     source_addr = control_msg_tmp->source_addr;
1292     register_size = control_msg_tmp->length;
1293
1294 #if  USE_LRTS_MEMPOOL
1295     if( control_msg_tmp->seq_id == 0 ){
1296 #if BLOCKING_SEND_CONTROL
1297         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1298             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1299                 LrtsAdvanceCommunication(0);
1300         }
1301 #endif
1302         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1303         {
1304             msg = (void*)source_addr;
1305             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1306             {
1307                 if(!inbuff)
1308                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1309                 return GNI_RC_ERROR_NOMEM;
1310             }
1311             //register the corresponding mempool
1312             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1313             if(status == GNI_RC_SUCCESS)
1314             {
1315                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1316             }
1317         }else
1318         {
1319             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1320             status = GNI_RC_SUCCESS;
1321         }
1322         if(NoMsgInSend( control_msg_tmp->source_addr))
1323             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1324         else
1325             register_size = 0;
1326     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1327     {
1328         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1329         source_addr += offset;
1330         size = control_msg_tmp->length;
1331 #if BLOCKING_SEND_CONTROL
1332         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1333             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1334                 LrtsAdvanceCommunication(0);
1335         }
1336 #endif
1337         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1338             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1339             {
1340                 if(!inbuff)
1341                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1342                 return GNI_RC_ERROR_NOMEM;
1343             }
1344             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1345             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1346         }
1347         else
1348         {
1349             status = GNI_RC_SUCCESS;
1350         }
1351         register_size = 0;  
1352     }
1353
1354     if(status == GNI_RC_SUCCESS)
1355     {
1356         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1357         if(status == GNI_RC_SUCCESS)
1358         {
1359             buffered_send_msg += register_size;
1360             if(control_msg_tmp->seq_id == 0)
1361             {
1362                 IncreaseMsgInSend(source_addr);
1363             }
1364             FreeControlMsg(control_msg_tmp);
1365             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1366         }else
1367             status = GNI_RC_ERROR_RESOURCE;
1368
1369     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1370     {
1371         CmiAbort("Memory registor for large msg\n");
1372     }else 
1373     {
1374         status = GNI_RC_ERROR_NOMEM; 
1375         if(!inbuff)
1376             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1377     }
1378     return status;
1379 #else
1380     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1381     if(status == GNI_RC_SUCCESS)
1382     {
1383         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1384         if(status == GNI_RC_SUCCESS)
1385         {
1386             FreeControlMsg(control_msg_tmp);
1387         }
1388     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1389     {
1390         CmiAbort("Memory registor for large msg\n");
1391     }else 
1392     {
1393         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1394     }
1395     return status;
1396 #endif
1397 }
1398
1399 inline void LrtsPrepareEnvelope(char *msg, int size)
1400 {
1401     CmiSetMsgSize(msg, size);
1402 }
1403
1404 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1405 {
1406     gni_return_t        status  =   GNI_RC_SUCCESS;
1407     uint8_t tag;
1408     CONTROL_MSG         *control_msg_tmp;
1409     int                 oob = ( mode & OUT_OF_BAND);
1410     SMSG_QUEUE          *queue;
1411
1412     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1413 #if CMK_USE_OOB
1414     queue = oob? &smsg_oob_queue : &smsg_queue;
1415 #else
1416     queue = &smsg_queue;
1417 #endif
1418
1419     LrtsPrepareEnvelope(msg, size);
1420
1421 #if PRINT_SYH
1422     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1423 #endif 
1424 #if CMK_SMP && COMM_THREAD_SEND
1425     if(size <= SMSG_MAX_MSG)
1426         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1427     else if (size < BIG_MSG) {
1428         control_msg_tmp =  construct_control_msg(size, msg, 0);
1429         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1430     }
1431     else {
1432           CmiSetMsgSeq(msg, 0);
1433           control_msg_tmp =  construct_control_msg(size, msg, 1);
1434           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1435     }
1436 #else   //non-smp, smp(worker sending)
1437     if(size <= SMSG_MAX_MSG)
1438     {
1439         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1440             CmiFree(msg);
1441     }
1442     else if (size < BIG_MSG) {
1443         control_msg_tmp =  construct_control_msg(size, msg, 0);
1444         send_large_messages(queue, destNode, control_msg_tmp, 0);
1445     }
1446     else {
1447 #if     USE_LRTS_MEMPOOL
1448         CmiSetMsgSeq(msg, 0);
1449         control_msg_tmp =  construct_control_msg(size, msg, 1);
1450         send_large_messages(queue, destNode, control_msg_tmp, 0);
1451 #else
1452         control_msg_tmp =  construct_control_msg(size, msg, 0);
1453         send_large_messages(queue, destNode, control_msg_tmp, 0);
1454 #endif
1455     }
1456 #endif
1457     return 0;
1458 }
1459
1460 static void    PumpDatagramConnection();
1461 static void registerUserTraceEvents() {
1462 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1463     traceRegisterUserEvent("setting up connections", 10);
1464     traceRegisterUserEvent("Receiving small msgs", 20);
1465     traceRegisterUserEvent("Release local transaction", 30);
1466     traceRegisterUserEvent("Sending buffered small msgs", 40);
1467     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1468 #endif
1469 }
1470
1471 static void ProcessDeadlock()
1472 {
1473     static CmiUInt8 *ptr = NULL;
1474     static CmiUInt8  last = 0, mysum, sum;
1475     static int count = 0;
1476     gni_return_t status;
1477     int i;
1478
1479 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1480 //sweep_mempool(CpvAccess(mempool));
1481     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1482     mysum = smsg_send_count + smsg_recv_count;
1483     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1484     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1485     GNI_RC_CHECK("PMI_Allgather", status);
1486     sum = 0;
1487     for (i=0; i<mysize; i++)  sum+= ptr[i];
1488     if (last == 0 || sum == last) 
1489         count++;
1490     else
1491         count = 0;
1492     last = sum;
1493     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1494     if (count == 2) { 
1495         /* detected twice, it is a real deadlock */
1496         if (myrank == 0)  {
1497             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1498             CmiAbort("Fatal> Deadlock detected.");
1499         }
1500
1501     }
1502     _detected_hang = 0;
1503 }
1504
1505 static void CheckProgress()
1506 {
1507     if (smsg_send_count == last_smsg_send_count &&
1508         smsg_recv_count == last_smsg_recv_count ) 
1509     {
1510         _detected_hang = 1;
1511 #if !CMK_SMP
1512         if (_detected_hang) ProcessDeadlock();
1513 #endif
1514
1515     }
1516     else {
1517         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1518         last_smsg_send_count = smsg_send_count;
1519         last_smsg_recv_count = smsg_recv_count;
1520         _detected_hang = 0;
1521     }
1522 }
1523
1524 static void set_limit()
1525 {
1526     //if (!user_set_flag && CmiMyRank() == 0) {
1527     if (CmiMyRank() == 0) {
1528         int mynode = CmiPhysicalNodeID(CmiMyPe());
1529         int numpes = CmiNumPesOnPhysicalNode(mynode);
1530         int numprocesses = numpes / CmiMyNodeSize();
1531         MAX_REG_MEM  = _totalmem / numprocesses;
1532         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1533         if (CmiMyPe() == 0)
1534            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1535     }
1536 }
1537
1538 void LrtsPostCommonInit(int everReturn)
1539 {
1540 #if CMK_DIRECT
1541     CmiDirectInit();
1542 #endif
1543 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1544     CpvInitialize(double, projTraceStart);
1545     /* only PE 0 needs to care about registration (to generate sts file). */
1546     if (CmiMyPe() == 0) {
1547         registerMachineUserEventsFunction(&registerUserTraceEvents);
1548     }
1549 #endif
1550
1551 #if CMK_SMP
1552     CmiIdleState *s=CmiNotifyGetState();
1553     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1554     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1555 #else
1556     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1557     if (useDynamicSMSG)
1558     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1559 #endif
1560
1561     if (_checkProgress)
1562 #if CMK_SMP
1563     if (CmiMyRank() == 0)
1564 #endif
1565     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1566 #if !LARGEPAGE
1567     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1568 #endif
1569 }
1570
1571 /* this is called by worker thread */
1572 void LrtsPostNonLocal(){
1573 #if CMK_SMP
1574 #if MULTI_THREAD_SEND
1575     if(mysize == 1) return;
1576     PumpLocalRdmaTransactions();
1577 #if CMK_USE_OOB
1578     if (SendBufferMsg(&smsg_oob_queue) == 1)
1579 #endif
1580     SendBufferMsg(&smsg_queue);
1581 #if PIGGYBACK_ACK
1582     SendBufferMsg(&smsg_ack_queue);
1583 #endif
1584     SendRdmaMsg();
1585 #endif
1586 #endif
1587 }
1588
1589 /* useDynamicSMSG */
1590 static void    PumpDatagramConnection()
1591 {
1592     uint32_t          remote_address;
1593     uint32_t          remote_id;
1594     gni_return_t status;
1595     gni_post_state_t  post_state;
1596     uint64_t          datagram_id;
1597     int i;
1598
1599    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1600    {
1601        if (datagram_id >= mysize) {           /* bound endpoint */
1602            int pe = datagram_id - mysize;
1603            CMI_GNI_LOCK
1604            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1605            CMI_GNI_UNLOCK
1606            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1607            {
1608                CmiAssert(remote_id == pe);
1609                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1610                GNI_RC_CHECK("Dynamic SMSG Init", status);
1611 #if PRINT_SYH
1612                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1613 #endif
1614                CmiAssert(smsg_connected_flag[pe] == 1);
1615                smsg_connected_flag[pe] = 2;
1616            }
1617        }
1618        else {         /* unbound ep */
1619            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1620            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1621            {
1622                CmiAssert(remote_id<mysize);
1623                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1624                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1625                GNI_RC_CHECK("Dynamic SMSG Init", status);
1626 #if PRINT_SYH
1627                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1628 #endif
1629                smsg_connected_flag[remote_id] = 2;
1630
1631                alloc_smsg_attr(&send_smsg_attr);
1632                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1633                GNI_RC_CHECK("post unbound datagram", status);
1634            }
1635        }
1636    }
1637 }
1638
1639 /* pooling CQ to receive network message */
1640 static void PumpNetworkRdmaMsgs()
1641 {
1642     gni_cq_entry_t      event_data;
1643     gni_return_t        status;
1644
1645 }
1646
1647 inline 
1648 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1649 {
1650     RDMA_REQUEST        *rdma_request_msg;
1651     MallocRdmaRequest(rdma_request_msg);
1652     rdma_request_msg->destNode = inst_id;
1653     rdma_request_msg->pd = pd;
1654 #if CMK_SMP
1655     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1656 #else
1657     if(sendRdmaBuf == 0)
1658     {
1659         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1660     }else{
1661         sendRdmaTail->next = rdma_request_msg;
1662         sendRdmaTail =  rdma_request_msg;
1663     }
1664 #endif
1665
1666 }
1667
1668 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1669 static void PumpNetworkSmsg()
1670 {
1671     uint64_t            inst_id;
1672     int                 ret;
1673     gni_cq_entry_t      event_data;
1674     gni_return_t        status, status2;
1675     void                *header;
1676     uint8_t             msg_tag;
1677     int                 msg_nbytes;
1678     void                *msg_data;
1679     gni_mem_handle_t    msg_mem_hndl;
1680     gni_smsg_attr_t     *smsg_attr;
1681     gni_smsg_attr_t     *remote_smsg_attr;
1682     int                 init_flag;
1683     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1684     uint64_t            source_addr;
1685     SMSG_QUEUE         *queue = &smsg_queue;
1686 #if     CMK_DIRECT
1687     cmidirectMsg        *direct_msg;
1688 #endif
1689     while(1)
1690     {
1691         CMI_GNI_LOCK
1692         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1693         CMI_GNI_UNLOCK
1694         if(status != GNI_RC_SUCCESS)
1695             break;
1696         inst_id = GNI_CQ_GET_INST_ID(event_data);
1697         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1698 #if PRINT_SYH
1699         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1700 #endif
1701         if (useDynamicSMSG) {
1702             /* subtle: smsg may come before connection is setup */
1703             while (smsg_connected_flag[inst_id] != 2) 
1704                PumpDatagramConnection();
1705         }
1706         msg_tag = GNI_SMSG_ANY_TAG;
1707         while(1) {
1708             CMI_GNI_LOCK
1709             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1710             CMI_GNI_UNLOCK
1711             if (status != GNI_RC_SUCCESS)
1712                 break;
1713 #if PRINT_SYH
1714             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1715 #endif
1716             /* copy msg out and then put into queue (small message) */
1717             switch (msg_tag) {
1718 #if PIGGYBACK_ACK
1719             case SMALL_DATA_ACK_TAG:
1720             {
1721                 int i;
1722                 uint64_t *buf = (uint64_t*)header;
1723                 int piggycount = buf[0];
1724 //printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
1725                 for (i=0; i<piggycount; i++) {
1726                     void *msg = (void*)(buf[i+1]);
1727                     CmiAssert(msg != NULL);
1728                     DecreaseMsgInSend(msg);
1729                     if(NoMsgInSend(msg))
1730                         buffered_send_msg -= GetMempoolsize(msg);
1731                     CmiFree(msg);
1732                 }
1733                 header = buf + piggycount + 1;
1734                 msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
1735             }
1736 #endif
1737             case SMALL_DATA_TAG:
1738             {
1739                 START_EVENT();
1740                 msg_nbytes = CmiGetMsgSize(header);
1741                 msg_data    = CmiAlloc(msg_nbytes);
1742                 memcpy(msg_data, (char*)header, msg_nbytes);
1743 #if CMK_SMP_TRACE_COMMTHREAD
1744                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1745 #endif
1746                 handleOneRecvedMsg(msg_nbytes, msg_data);
1747                 break;
1748             }
1749             case LMSG_INIT_TAG:
1750             {
1751                 getLargeMsgRequest(header, inst_id);
1752                 break;
1753             }
1754             case ACK_TAG:   //msg fit into mempool
1755             {
1756                 /* Get is done, release message . Now put is not used yet*/
1757                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1758 #if ! USE_LRTS_MEMPOOL
1759                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1760 #else
1761                 DecreaseMsgInSend(msg);
1762 #endif
1763                 if(NoMsgInSend(msg))
1764                     buffered_send_msg -= GetMempoolsize(msg);
1765                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1766                 CmiFree(msg);
1767                 break;
1768             }
1769             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1770             {
1771                 header_tmp = (CONTROL_MSG *) header;
1772                 void *msg = (void*)(header_tmp->source_addr);
1773                 int cur_seq = CmiGetMsgSeq(msg);
1774                 int offset = ONE_SEG*(cur_seq+1);
1775                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1776                 buffered_send_msg -= header_tmp->length;
1777                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1778                 if (remain_size < 0) remain_size = 0;
1779                 CmiSetMsgSize(msg, remain_size);
1780                 if(remain_size <= 0) //transaction done
1781                 {
1782                     CmiFree(msg);
1783                 }else if (header_tmp->total_length > offset)
1784                 {
1785                     CmiSetMsgSeq(msg, cur_seq+1);
1786                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1787                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1788                     //send next seg
1789                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1790                          // pipelining
1791                     if (header_tmp->seq_id == 1) {
1792                       int i;
1793                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1794                         int seq = cur_seq+i+2;
1795                         CmiSetMsgSeq(msg, seq-1);
1796                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1797                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1798                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1799                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1800                       }
1801                     }
1802                 }
1803                 break;
1804             }
1805 #if CMK_PERSISTENT_COMM
1806             case PUT_DONE_TAG: //persistent message
1807                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1808                 int size = ((CONTROL_MSG *) header)->length;
1809                 CmiReference(msg);
1810                 handleOneRecvedMsg(size, msg); 
1811                 break;
1812 #endif
1813 #if CMK_DIRECT
1814             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1815                 //create a trigger message
1816                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1817                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1818                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1819                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1820                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1821                 break;
1822 #endif
1823             default: {
1824                 printf("weird tag problem\n");
1825                 CmiAbort("Unknown tag\n");
1826                      }
1827             }               // end switch
1828 #if PRINT_SYH
1829             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1830 #endif
1831             CMI_GNI_LOCK
1832             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1833             CMI_GNI_UNLOCK
1834             smsg_recv_count ++;
1835             msg_tag = GNI_SMSG_ANY_TAG;
1836         } //endwhile getNext
1837     }   //end while GetEvent
1838     if(status == GNI_RC_ERROR_RESOURCE)
1839     {
1840         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1841         GNI_RC_CHECK("Smsg_rx_cq full", status);
1842     }
1843 }
1844
1845 static void printDesc(gni_post_descriptor_t *pd)
1846 {
1847     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1848 }
1849
1850 // for BIG_MSG called on receiver side for receiving control message
1851 // LMSG_INIT_TAG
1852 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1853 {
1854 #if     USE_LRTS_MEMPOOL
1855     CONTROL_MSG         *request_msg;
1856     gni_return_t        status = GNI_RC_SUCCESS;
1857     void                *msg_data;
1858     gni_post_descriptor_t *pd;
1859     gni_mem_handle_t    msg_mem_hndl;
1860     int source, size, transaction_size, offset = 0;
1861     size_t     register_size = 0;
1862
1863     // initial a get to transfer data from the sender side */
1864     request_msg = (CONTROL_MSG *) header;
1865     size = request_msg->total_length;
1866     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1867     if(request_msg->seq_id < 2)   {
1868         msg_data = CmiAlloc(size);
1869         CmiSetMsgSeq(msg_data, 0);
1870         _MEMCHECK(msg_data);
1871     }
1872     else {
1873         offset = ONE_SEG*(request_msg->seq_id-1);
1874         msg_data = (char*)request_msg->dest_addr + offset;
1875     }
1876    
1877     MallocPostDesc(pd);
1878     pd->cqwrite_value = request_msg->seq_id;
1879     if( request_msg->seq_id == 0)
1880     {
1881         pd->local_mem_hndl= GetMemHndl(msg_data);
1882         transaction_size = ALIGN64(size);
1883         if(IsMemHndlZero(pd->local_mem_hndl))
1884         {   
1885             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1886             if(status == GNI_RC_SUCCESS)
1887             {
1888                 pd->local_mem_hndl = GetMemHndl(msg_data);
1889             }
1890             else
1891             {
1892                 SetMemHndlZero(pd->local_mem_hndl);
1893             }
1894         }
1895         if(NoMsgInRecv( (void*)(msg_data)))
1896             register_size = GetMempoolsize((void*)(msg_data));
1897         else
1898             register_size = 0;
1899     }
1900     else{
1901         transaction_size = ALIGN64(request_msg->length);
1902         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1903         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1904         {
1905             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1906         }
1907     }
1908     pd->first_operand = ALIGN64(size);                   //  total length
1909
1910     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1911         pd->type            = GNI_POST_FMA_GET;
1912     else
1913         pd->type            = GNI_POST_RDMA_GET;
1914 #if REMOTE_EVENT
1915     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1916 #else
1917     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1918 #endif
1919     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1920     pd->length          = transaction_size;
1921     pd->local_addr      = (uint64_t) msg_data;
1922     pd->remote_addr     = request_msg->source_addr + offset;
1923     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1924     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1925     pd->rdma_mode       = 0;
1926     pd->amo_cmd         = 0;
1927
1928     //memory registration success
1929     if(status == GNI_RC_SUCCESS)
1930     {
1931         CMI_GNI_LOCK
1932         if(pd->type == GNI_POST_RDMA_GET) 
1933             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1934         else
1935             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1936         CMI_GNI_UNLOCK
1937
1938         if(status == GNI_RC_SUCCESS )
1939         {
1940             if(pd->cqwrite_value == 0)
1941             {
1942 #if MACHINE_DEBUG_LOG
1943                 buffered_recv_msg += register_size;
1944                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1945 #endif
1946                 IncreaseMsgInRecv(msg_data);
1947             }
1948         }
1949     }else
1950     {
1951         SetMemHndlZero((pd->local_mem_hndl));
1952     }
1953     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1954     {
1955         bufferRdmaMsg(inst_id, pd); 
1956     }else {
1957          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1958         GNI_RC_CHECK("GetLargeAFter posting", status);
1959     }
1960 #else
1961     CONTROL_MSG         *request_msg;
1962     gni_return_t        status;
1963     void                *msg_data;
1964     gni_post_descriptor_t *pd;
1965     RDMA_REQUEST        *rdma_request_msg;
1966     gni_mem_handle_t    msg_mem_hndl;
1967     //int source;
1968     // initial a get to transfer data from the sender side */
1969     request_msg = (CONTROL_MSG *) header;
1970     msg_data = CmiAlloc(request_msg->length);
1971     _MEMCHECK(msg_data);
1972
1973     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1974
1975     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1976     {
1977         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1978     }
1979
1980     MallocPostDesc(pd);
1981     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1982         pd->type            = GNI_POST_FMA_GET;
1983     else
1984         pd->type            = GNI_POST_RDMA_GET;
1985 #if REMOTE_EVENT
1986     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1987 #else
1988     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1989 #endif
1990     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1991     pd->length          = ALIGN64(request_msg->length);
1992     pd->local_addr      = (uint64_t) msg_data;
1993     pd->remote_addr     = request_msg->source_addr;
1994     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1995     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1996     pd->rdma_mode       = 0;
1997     pd->amo_cmd         = 0;
1998
1999     //memory registration successful
2000     if(status == GNI_RC_SUCCESS)
2001     {
2002         pd->local_mem_hndl  = msg_mem_hndl;
2003         CMI_GNI_LOCK
2004         if(pd->type == GNI_POST_RDMA_GET) 
2005             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2006         else
2007             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2008         CMI_GNI_UNLOCK
2009     }else
2010     {
2011         SetMemHndlZero(pd->local_mem_hndl);
2012     }
2013     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2014     {
2015         MallocRdmaRequest(rdma_request_msg);
2016         rdma_request_msg->next = 0;
2017         rdma_request_msg->destNode = inst_id;
2018         rdma_request_msg->pd = pd;
2019         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2020     }else {
2021         GNI_RC_CHECK("AFter posting", status);
2022     }
2023 #endif
2024 }
2025
2026 static void PumpLocalRdmaTransactions()
2027 {
2028     gni_cq_entry_t          ev;
2029     gni_return_t            status;
2030     uint64_t                type, inst_id;
2031     gni_post_descriptor_t   *tmp_pd;
2032     MSG_LIST                *ptr;
2033     CONTROL_MSG             *ack_msg_tmp;
2034     ACK_MSG                 *ack_msg;
2035     uint8_t                 msg_tag;
2036 #if CMK_DIRECT
2037     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2038 #endif
2039     SMSG_QUEUE         *queue = &smsg_queue;
2040
2041     while(1) {
2042         CMI_GNI_LOCK 
2043         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
2044         CMI_GNI_UNLOCK
2045         if(status != GNI_RC_SUCCESS) break;
2046         
2047         type = GNI_CQ_GET_TYPE(ev);
2048         if (type == GNI_CQ_EVENT_TYPE_POST)
2049         {
2050             inst_id     = GNI_CQ_GET_INST_ID(ev);
2051 #if PRINT_SYH
2052             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2053 #endif
2054             CMI_GNI_LOCK
2055             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
2056             CMI_GNI_UNLOCK
2057
2058             switch (tmp_pd->type) {
2059 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2060             case GNI_POST_RDMA_PUT:
2061 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2062                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2063 #endif
2064             case GNI_POST_FMA_PUT:
2065                 if(tmp_pd->amo_cmd == 1) {
2066                     //sender ACK to receiver to trigger it is done
2067                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2068                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2069                     msg_tag = DIRECT_PUT_DONE_TAG;
2070                 }
2071                 else {
2072                     CmiFree((void *)tmp_pd->local_addr);
2073                     MallocControlMsg(ack_msg_tmp);
2074                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2075                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2076                     msg_tag = PUT_DONE_TAG;
2077                 }
2078                 break;
2079 #endif
2080             case GNI_POST_RDMA_GET:
2081             case GNI_POST_FMA_GET:  {
2082 #if  ! USE_LRTS_MEMPOOL
2083                 MallocControlMsg(ack_msg_tmp);
2084                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2085                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2086                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2087                 msg_tag = ACK_TAG;  
2088 #else
2089                 int seq_id = tmp_pd->cqwrite_value;
2090                 if(seq_id > 0)      // BIG_MSG
2091                 {
2092                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2093                     MallocControlMsg(ack_msg_tmp);
2094                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2095                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2096                     ack_msg_tmp->seq_id = seq_id;
2097                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2098                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2099                     ack_msg_tmp->length = tmp_pd->length;
2100                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2101                     msg_tag = BIG_MSG_TAG; 
2102                 } 
2103                 else
2104                 {
2105                     MallocAckMsg(ack_msg);
2106                     ack_msg->source_addr = tmp_pd->remote_addr;
2107                     msg_tag = ACK_TAG;  
2108                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
2109                 }
2110 #endif
2111                 break;
2112             }
2113             default:
2114                 CmiPrintf("type=%d\n", tmp_pd->type);
2115                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
2116             }      /* end of switch */
2117
2118 #if CMK_DIRECT
2119             if (tmp_pd->amo_cmd == 1) {
2120                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
2121                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2122             }
2123             else
2124 #endif
2125             if (msg_tag == ACK_TAG) {
2126 #if ! PIGGYBACK_ACK
2127                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
2128                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2129 #else
2130                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
2131 #endif
2132             }
2133             else {
2134                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
2135                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2136             }
2137 #if CMK_PERSISTENT_COMM
2138             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2139 #endif
2140             {
2141                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2142 #if PRINT_SYH
2143                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2144 #endif
2145                     START_EVENT();
2146                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2147                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2148 #if MACHINE_DEBUG_LOG
2149                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2150                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2151                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2152 #endif
2153 #if CMK_SMP_TRACE_COMMTHREAD
2154                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2155 #endif
2156                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2157                 }else if(msg_tag == BIG_MSG_TAG){
2158                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2159                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2160                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2161                         START_EVENT();
2162 #if PRINT_SYH
2163                         printf("Pipeline msg done [%d]\n", myrank);
2164 #endif
2165 #if CMK_SMP_TRACE_COMMTHREAD
2166                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2167 #endif
2168                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2169                     }
2170                 }
2171             }
2172             FreePostDesc(tmp_pd);
2173         }
2174     } //end while
2175     if(status == GNI_RC_ERROR_RESOURCE)
2176     {
2177         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2178         GNI_RC_CHECK("Smsg_tx_cq full", status);
2179     }
2180 }
2181
2182 static void  SendRdmaMsg()
2183 {
2184     gni_return_t            status = GNI_RC_SUCCESS;
2185     gni_mem_handle_t        msg_mem_hndl;
2186     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2187     RDMA_REQUEST            *pre = 0;
2188     uint64_t                register_size = 0;
2189     void                    *msg;
2190     int                     i;
2191 #if CMK_SMP
2192     int len = PCQueueLength(sendRdmaBuf);
2193     for (i=0; i<len; i++)
2194     {
2195         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2196         if (ptr == NULL) break;
2197 #else
2198     ptr = sendRdmaBuf;
2199     while (ptr!=0)
2200     {
2201 #endif 
2202         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2203         gni_post_descriptor_t *pd = ptr->pd;
2204         status = GNI_RC_SUCCESS;
2205         
2206         if(pd->cqwrite_value == 0)
2207         {
2208             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2209             {
2210                 msg = (void*)(pd->local_addr);
2211                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2212                 if(status == GNI_RC_SUCCESS)
2213                 {
2214                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2215                 }
2216             }else
2217             {
2218                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2219             }
2220             if(NoMsgInRecv( (void*)(pd->local_addr)))
2221                 register_size = GetMempoolsize((void*)(pd->local_addr));
2222             else
2223                 register_size = 0;
2224         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2225         {
2226             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2227         }
2228         if(status == GNI_RC_SUCCESS)        //mem register good
2229         {
2230             CMI_GNI_LOCK
2231             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2232                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2233             else
2234                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2235             CMI_GNI_UNLOCK
2236             if(status == GNI_RC_SUCCESS)    //post good
2237             {
2238 #if !CMK_SMP
2239                 tmp_ptr = ptr;
2240                 if(pre != 0) {
2241                     pre->next = ptr->next;
2242                 }
2243                 else {
2244                     sendRdmaBuf = ptr->next;
2245                 }
2246                 ptr = ptr->next;
2247                 FreeRdmaRequest(tmp_ptr);
2248 #endif
2249                 if(pd->cqwrite_value == 0)
2250                 {
2251                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2252                 }
2253 #if MACHINE_DEBUG_LOG
2254                 buffered_recv_msg += register_size;
2255                 MACHSTATE(8, "GO request from buffered\n"); 
2256 #endif
2257             }else           // cannot post
2258             {
2259 #if CMK_SMP
2260                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2261 #else
2262                 pre = ptr;
2263                 ptr = ptr->next;
2264 #endif
2265                 break;
2266             }
2267         } else          //memory registration fails
2268         {
2269 #if CMK_SMP
2270             PCQueuePush(sendRdmaBuf, (char*)ptr);
2271 #else
2272             pre = ptr;
2273             ptr = ptr->next;
2274 #endif
2275         }
2276     } //end while
2277 #if ! CMK_SMP
2278     if(ptr == 0)
2279         sendRdmaTail = pre;
2280 #endif
2281 }
2282
2283 // return 1 if all messages are sent
2284 static int SendBufferMsg(SMSG_QUEUE *queue)
2285 {
2286     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2287     CONTROL_MSG         *control_msg_tmp;
2288     gni_return_t        status;
2289     int                 done = 1;
2290     uint64_t            register_size;
2291     void                *register_addr;
2292     int                 index_previous = -1;
2293 #if CMI_EXERT_SEND_CAP
2294     int                 sent_cnt = 0;
2295 #endif
2296
2297 #if CMK_SMP
2298     int          index = 0;
2299 #if ONE_SEND_QUEUE
2300     memset(destpe_avail, 0, mysize * sizeof(char));
2301     for (index=0; index<1; index++)
2302     {
2303         int i, len = PCQueueLength(queue->sendMsgBuf);
2304         for (i=0; i<len; i++) 
2305         {
2306             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2307             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2308                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2309                 continue;
2310             }
2311 #else
2312 #if SMP_LOCKS
2313     int nonempty = PCQueueLength(nonEmptyQueues);
2314     for(index =0; index<nonempty; index++)
2315     {
2316         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2317         PCQueue current_queue= current_list-> sendSmsgBuf;
2318         CmiLock(current_list->lock);
2319         int i, len = PCQueueLength(current_queue);
2320         current_list->pushed = 0;
2321         CmiUnlock(current_list->lock);
2322 #else
2323     for(index =0; index<mysize; index++)
2324     {
2325         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2326         int i, len = PCQueueLength(current_queue);
2327 #endif
2328         for (i=0; i<len; i++) 
2329         {
2330             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2331             if (ptr == 0) break;
2332 #endif
2333 #else
2334     int index = queue->smsg_head_index;
2335     while(index != -1)
2336     {
2337         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2338         pre = 0;
2339         while(ptr != 0)
2340         {
2341 #endif
2342             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2343             status = GNI_RC_ERROR_RESOURCE;
2344             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2345                 /* connection not exists yet */
2346             }
2347             else
2348             switch(ptr->tag)
2349             {
2350             case SMALL_DATA_TAG:
2351                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2352                 if(status == GNI_RC_SUCCESS)
2353                 {
2354                     CmiFree(ptr->msg);
2355                 }
2356                 break;
2357             case LMSG_INIT_TAG:
2358                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2359                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2360                 break;
2361             case ACK_TAG:
2362                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2363                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2364                 break;
2365             case BIG_MSG_TAG:
2366                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2367                 if(status == GNI_RC_SUCCESS)
2368                 {
2369                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2370                 }
2371                 break;
2372 #if CMK_DIRECT
2373             case DIRECT_PUT_DONE_TAG:
2374                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2375                 if(status == GNI_RC_SUCCESS)
2376                 {
2377                     free((CMK_DIRECT_HEADER*)ptr->msg);
2378                 }
2379                 break;
2380
2381 #endif
2382             default:
2383                 printf("Weird tag\n");
2384                 CmiAbort("should not happen\n");
2385             }       // end switch
2386             if(status == GNI_RC_SUCCESS)
2387             {
2388 #if !CMK_SMP
2389                 tmp_ptr = ptr;
2390                 if(pre)
2391                 {
2392                     ptr = pre ->next = ptr->next;
2393                 }else
2394                 {
2395                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2396                 }
2397                 FreeMsgList(tmp_ptr);
2398 #else
2399                 FreeMsgList(ptr);
2400 #endif
2401 #if PRINT_SYH
2402                 buffered_smsg_counter--;
2403                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2404 #endif
2405 #if CMI_EXERT_SEND_CAP
2406                 sent_cnt++;
2407                 if(sent_cnt == SEND_CAP)
2408                     break;
2409 #endif
2410             }else {
2411 #if CMK_SMP
2412 #if ONE_SEND_QUEUE
2413                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2414 #else
2415                 PCQueuePush(current_queue, (char*)ptr);
2416 #endif
2417 #else
2418                 pre = ptr;
2419                 ptr=ptr->next;
2420 #endif
2421                 done = 0;
2422                 if(status == GNI_RC_ERROR_RESOURCE)
2423                 {
2424 #if CMK_SMP && ONE_SEND_QUEUE 
2425                     destpe_avail[ptr->destNode] = 1;
2426 #else
2427                     break;
2428 #endif
2429                 }
2430             } 
2431         } //end while
2432 #if !CMK_SMP
2433         if(ptr == 0)
2434             queue->smsg_msglist_index[index].tail = pre;
2435         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2436         {
2437             if(index_previous != -1)
2438                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2439             else
2440                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2441         }
2442         else {
2443             index_previous = index;
2444         }
2445         index = queue->smsg_msglist_index[index].next;
2446 #else
2447 #if !ONE_SEND_QUEUE && SMP_LOCKS
2448         CmiLock(current_list->lock);
2449         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2450         {
2451             current_list->pushed = 1;
2452             PCQueuePush(nonEmptyQueues, current_list);
2453         }
2454         CmiUnlock(current_list->lock); 
2455 #endif
2456 #endif
2457
2458 #if CMI_EXERT_SEND_CAP
2459         if(sent_cnt == SEND_CAP)
2460                 break;
2461 #endif
2462     }   // end pooling for all cores
2463     return done;
2464 }
2465
2466 static void ProcessDeadlock();
2467 void LrtsAdvanceCommunication(int whileidle)
2468 {
2469     static int count = 0;
2470     /*  Receive Msg first */
2471 #if CMK_SMP_TRACE_COMMTHREAD
2472     double startT, endT;
2473 #endif
2474     if (useDynamicSMSG && whileidle)
2475     {
2476 #if CMK_SMP_TRACE_COMMTHREAD
2477         startT = CmiWallTimer();
2478 #endif
2479         PumpDatagramConnection();
2480 #if CMK_SMP_TRACE_COMMTHREAD
2481         endT = CmiWallTimer();
2482         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2483 #endif
2484     }
2485
2486 #if CMK_SMP_TRACE_COMMTHREAD
2487     startT = CmiWallTimer();
2488 #endif
2489     PumpNetworkSmsg();
2490     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2491 #if CMK_SMP_TRACE_COMMTHREAD
2492     endT = CmiWallTimer();
2493     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2494 #endif
2495
2496 #if CMK_SMP_TRACE_COMMTHREAD
2497     startT = CmiWallTimer();
2498 #endif
2499     PumpLocalRdmaTransactions();
2500     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2501 #if CMK_SMP_TRACE_COMMTHREAD
2502     endT = CmiWallTimer();
2503     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2504 #endif
2505     /* Send buffered Message */
2506 #if CMK_SMP_TRACE_COMMTHREAD
2507     startT = CmiWallTimer();
2508 #endif
2509 #if CMK_USE_OOB
2510     if (SendBufferMsg(&smsg_oob_queue) == 1)
2511 #endif
2512     {
2513 #if PIGGYBACK_ACK
2514     //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
2515     if (SendBufferMsg(&smsg_queue) == 1) {
2516         //if (count++ % 10 == 0) 
2517         SendBufferMsg(&smsg_ack_queue);
2518     }
2519 #else
2520     SendBufferMsg(&smsg_queue);
2521 #endif
2522     }
2523     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2524 #if CMK_SMP_TRACE_COMMTHREAD
2525     endT = CmiWallTimer();
2526     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2527 #endif
2528
2529 #if CMK_SMP_TRACE_COMMTHREAD
2530     startT = CmiWallTimer();
2531 #endif
2532     SendRdmaMsg();
2533     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2534 #if CMK_SMP_TRACE_COMMTHREAD
2535     endT = CmiWallTimer();
2536     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2537 #endif
2538
2539 #if CMK_SMP
2540     if (_detected_hang)  ProcessDeadlock();
2541 #endif
2542 }
2543
2544 /* useDynamicSMSG */
2545 static void _init_dynamic_smsg()
2546 {
2547     gni_return_t status;
2548     uint32_t     vmdh_index = -1;
2549     int i;
2550
2551     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2552     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2553     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2554     for(i=0; i<mysize; i++) {
2555         smsg_connected_flag[i] = 0;
2556         smsg_attr_vector_local[i] = NULL;
2557         smsg_attr_vector_remote[i] = NULL;
2558     }
2559     if(mysize <=512)
2560     {
2561         SMSG_MAX_MSG = 4096;
2562     }else if (mysize <= 4096)
2563     {
2564         SMSG_MAX_MSG = 4096/mysize * 1024;
2565     }else if (mysize <= 16384)
2566     {
2567         SMSG_MAX_MSG = 512;
2568     }else {
2569         SMSG_MAX_MSG = 256;
2570     }
2571
2572     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2573     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2574     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2575     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2576     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2577
2578     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2579     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2580     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2581     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2582     mailbox_list->offset = 0;
2583     mailbox_list->next = 0;
2584     
2585     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2586         mailbox_list->size, smsg_rx_cqh,
2587         GNI_MEM_READWRITE,   
2588         vmdh_index,
2589         &(mailbox_list->mem_hndl));
2590     GNI_RC_CHECK("MEMORY registration for smsg", status);
2591
2592     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2593     GNI_RC_CHECK("Unbound EP", status);
2594     
2595     alloc_smsg_attr(&send_smsg_attr);
2596
2597     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2598     GNI_RC_CHECK("post unbound datagram", status);
2599
2600       /* always pre-connect to proc 0 */
2601     //if (myrank != 0) connect_to(0);
2602 }
2603
2604 static void _init_static_smsg()
2605 {
2606     gni_smsg_attr_t      *smsg_attr;
2607     gni_smsg_attr_t      remote_smsg_attr;
2608     gni_smsg_attr_t      *smsg_attr_vec;
2609     gni_mem_handle_t     my_smsg_mdh_mailbox;
2610     int      ret, i;
2611     gni_return_t status;
2612     uint32_t              vmdh_index = -1;
2613     mdh_addr_t            base_infor;
2614     mdh_addr_t            *base_addr_vec;
2615     char *env;
2616
2617     if(mysize <=512)
2618     {
2619         SMSG_MAX_MSG = 1024;
2620     }else if (mysize <= 4096)
2621     {
2622         SMSG_MAX_MSG = 1024;
2623     }else if (mysize <= 16384)
2624     {
2625         SMSG_MAX_MSG = 512;
2626     }else {
2627         SMSG_MAX_MSG = 256;
2628     }
2629     
2630     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2631     if (env) SMSG_MAX_MSG = atoi(env);
2632     CmiAssert(SMSG_MAX_MSG > 0);
2633
2634     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2635     
2636     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2637     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2638     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2639     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2640     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2641     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2642     CmiAssert(ret == 0);
2643     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2644     
2645     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2646             smsg_memlen*(mysize), smsg_rx_cqh,
2647             GNI_MEM_READWRITE,   
2648             vmdh_index,
2649             &my_smsg_mdh_mailbox);
2650     register_memory_size += smsg_memlen*(mysize);
2651     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2652
2653     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2654     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2655
2656     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2657     base_infor.mdh =  my_smsg_mdh_mailbox;
2658     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2659
2660     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2661  
2662     for(i=0; i<mysize; i++)
2663     {
2664         if(i==myrank)
2665             continue;
2666         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2667         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2668         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2669         smsg_attr[i].mbox_offset = i*smsg_memlen;
2670         smsg_attr[i].buff_size = smsg_memlen;
2671         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2672         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2673     }
2674
2675     for(i=0; i<mysize; i++)
2676     {
2677         if (myrank == i) continue;
2678
2679         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2680         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2681         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2682         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2683         remote_smsg_attr.buff_size = smsg_memlen;
2684         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2685         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2686
2687         /* initialize the smsg channel */
2688         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2689         GNI_RC_CHECK("SMSG Init", status);
2690     } //end initialization
2691
2692     free(base_addr_vec);
2693     free(smsg_attr);
2694
2695     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2696     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2697
2698
2699 inline
2700 static void _init_send_queue(SMSG_QUEUE *queue)
2701 {
2702      int i;
2703 #if ONE_SEND_QUEUE
2704      queue->sendMsgBuf = PCQueueCreate();
2705      destpe_avail = (char*)malloc(mysize * sizeof(char));
2706 #else
2707      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2708 #if CMK_SMP && SMP_LOCKS
2709      nonEmptyQueues = PCQueueCreate();
2710 #endif
2711      for(i =0; i<mysize; i++)
2712      {
2713 #if CMK_SMP
2714          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2715 #if SMP_LOCKS
2716          queue->smsg_msglist_index[i].pushed = 0;
2717          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2718 #endif
2719 #else
2720          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2721          queue->smsg_msglist_index[i].next = -1;
2722          queue->smsg_head_index = -1;
2723 #endif
2724         
2725      }
2726 #endif
2727 }
2728
2729 inline
2730 static void _init_smsg()
2731 {
2732     if(mysize > 1) {
2733         if (useDynamicSMSG)
2734             _init_dynamic_smsg();
2735         else
2736             _init_static_smsg();
2737     }
2738
2739     _init_send_queue(&smsg_queue);
2740 #if PIGGYBACK_ACK
2741     _init_send_queue(&smsg_ack_queue);
2742 #endif
2743 #if CMK_USE_OOB
2744     _init_send_queue(&smsg_oob_queue);
2745 #endif
2746 }
2747
2748 static void _init_static_msgq()
2749 {
2750     gni_return_t status;
2751     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2752     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2753     msgq_attrs.smsg_q_sz = 1;
2754     msgq_attrs.rcv_pool_sz = 1;
2755     msgq_attrs.num_msgq_eps = 2;
2756     msgq_attrs.nloc_insts = 8;
2757     msgq_attrs.modes = 0;
2758     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2759
2760     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2761     GNI_RC_CHECK("MSGQ Init", status);
2762
2763
2764 }
2765
2766 #if CMK_SMP && STEAL_MEMPOOL
2767 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2768 {
2769     void *pool = NULL;
2770     int i, k;
2771     // check other ranks
2772     for (k=0; k<CmiMyNodeSize()+1; k++) {
2773         i = (CmiMyRank()+k)%CmiMyNodeSize();
2774         if (i==CmiMyRank()) continue;
2775         mempool_type *mptr = CpvAccessOther(mempool, i);
2776         CmiLock(mptr->mempoolLock);
2777         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2778         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2779             CmiUnlock(mptr->mempoolLock);
2780             continue;
2781         }
2782         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2783         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2784             /* search in the free list */
2785           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2786           mempool_header *current = free_header;
2787           while (current) {
2788             if (current->next_free == (char*)header-(char*)mptr) break;
2789             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2790           }
2791           if (current == NULL) {         /* not found in free list */
2792             CmiUnlock(mptr->mempoolLock);
2793             continue;
2794           }
2795 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2796             /* search the previous memblock, and remove the tail */
2797           mempool_block *ptr = (mempool_block *)mptr;
2798           while (ptr) {
2799             if (ptr->memblock_next == mptr->memblock_tail) break;
2800             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2801           }
2802           CmiAssert(ptr!=NULL);
2803           ptr->memblock_next = 0;
2804           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2805
2806             /* remove memblock from the free list */
2807           current->next_free = header->next_free;
2808           if (header == free_header) mptr->freelist_head = header->next_free;
2809
2810           CmiUnlock(mptr->mempoolLock);
2811
2812           pool = (void*)tail;
2813           *mem_hndl = tail->mem_hndl;
2814           *size = tail->size;
2815           return pool;
2816         }
2817         CmiUnlock(mptr->mempoolLock);
2818     }
2819
2820       /* steal failed, deregister and free memblock now */
2821     int freed = 0;
2822     for (k=0; k<CmiMyNodeSize()+1; k++) {
2823         i = (CmiMyRank()+k)%CmiMyNodeSize();
2824         mempool_type *mptr = CpvAccessOther(mempool, i);
2825         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2826
2827         mempool_block *mempools_head = &(mptr->mempools_head);
2828         mempool_block *current = mempools_head;
2829         mempool_block *prev = NULL;
2830
2831         while (current) {
2832           int isfree = 0;
2833           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2834 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2835           mempool_header *cur = free_header;
2836           mempool_header *header;
2837           if (current != mempools_head) {
2838             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2839              /* search in free list */
2840             if (header->size == current->size - sizeof(mempool_block)) {
2841               cur = free_header;
2842               while (cur) {
2843                 if (cur->next_free == (char*)header-(char*)mptr) break;
2844                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2845               }
2846               if (cur != NULL) isfree = 1;
2847             }
2848           }
2849           if (isfree) {
2850               /* remove from free list */
2851             cur->next_free = header->next_free;
2852             if (header == free_header) mptr->freelist_head = header->next_free;
2853              // deregister
2854             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2855             GNI_RC_CHECK("steal Mempool de-register", status);
2856             mempool_block *ptr = current;
2857             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2858             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2859 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2860             freed += ptr->size;
2861             free(ptr);
2862              // try now
2863             if (freed > *size) {
2864               if (pool == NULL) {
2865                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2866                 CmiAssert(ret == 0);
2867               }
2868               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2869               if (status == GNI_RC_SUCCESS) {
2870                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2871 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2872                 return pool;
2873               }
2874 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2875             }
2876           }
2877           else {
2878              prev = current;
2879              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2880           }
2881         }
2882
2883         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2884     }
2885       /* still no luck registering pool */
2886     if (pool) free(pool);
2887     return NULL;
2888 }
2889 #endif
2890
2891 static CmiUInt8 total_mempool_size = 0;
2892 static CmiUInt8 total_mempool_calls = 0;
2893
2894 #if USE_LRTS_MEMPOOL
2895 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2896 {
2897     void *pool;
2898     int ret;
2899     gni_return_t status = GNI_RC_SUCCESS;
2900
2901     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2902     if (*size < default_size) *size = default_size;
2903 #if LARGEPAGE
2904     // round up to be multiple of _tlbpagesize
2905     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
2906     *size = ALIGNHUGEPAGE(*size);
2907 #endif
2908     total_mempool_size += *size;
2909     total_mempool_calls += 1;
2910 #if   !LARGEPAGE
2911     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
2912     {
2913         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
2914         CmiAbort("alloc_mempool_block");
2915     }
2916 #endif
2917 #if LARGEPAGE
2918     pool = my_get_huge_pages(*size);
2919     ret = pool==NULL;
2920 #else
2921     ret = posix_memalign(&pool, ALIGNBUF, *size);
2922 #endif
2923     if (ret != 0) {
2924 #if CMK_SMP && STEAL_MEMPOOL
2925       pool = steal_mempool_block(size, mem_hndl);
2926       if (pool != NULL) return pool;
2927 #endif
2928       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2929       if (ret == ENOMEM)
2930         CmiAbort("alloc_mempool_block: out of memory.");
2931       else
2932         CmiAbort("alloc_mempool_block: posix_memalign failed");
2933     }
2934 #if LARGEPAGE
2935     CmiMemLock();
2936     register_count++;
2937     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
2938     CmiMemUnlock();
2939     if(status != GNI_RC_SUCCESS) {
2940         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
2941 sweep_mempool(CpvAccess(mempool));
2942     }
2943     GNI_RC_CHECK("MEMORY_REGISTER", status);
2944 #else
2945     SetMemHndlZero((*mem_hndl));
2946 #endif
2947     return pool;
2948 }
2949
2950 // ptr is a block head pointer
2951 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2952 {
2953     if(!(IsMemHndlZero(mem_hndl)))
2954     {
2955         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
2956     }
2957 #if LARGEPAGE
2958     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
2959 #else
2960     free(ptr);
2961 #endif
2962 }
2963 #endif
2964
2965 void LrtsPreCommonInit(int everReturn){
2966 #if USE_LRTS_MEMPOOL
2967     CpvInitialize(mempool_type*, mempool);
2968     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
2969     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2970 #endif
2971 }
2972
2973 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2974 {
2975     register int            i;
2976     int                     rc;
2977     int                     device_id = 0;
2978     unsigned int            remote_addr;
2979     gni_cdm_handle_t        cdm_hndl;
2980     gni_return_t            status = GNI_RC_SUCCESS;
2981     uint32_t                vmdh_index = -1;
2982     uint8_t                 ptag;
2983     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2984     int                     first_spawned;
2985     int                     physicalID;
2986     char                   *env;
2987
2988     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2989     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2990     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2991    
2992     status = PMI_Init(&first_spawned);
2993     GNI_RC_CHECK("PMI_Init", status);
2994
2995     status = PMI_Get_size(&mysize);
2996     GNI_RC_CHECK("PMI_Getsize", status);
2997
2998     status = PMI_Get_rank(&myrank);
2999     GNI_RC_CHECK("PMI_getrank", status);
3000
3001     //physicalID = CmiPhysicalNodeID(myrank);
3002     
3003     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3004
3005     *myNodeID = myrank;
3006     *numNodes = mysize;
3007   
3008 #if MULTI_THREAD_SEND
3009     /* Currently, we only consider the case that comm. thread will only recv msgs */
3010     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
3011 #endif
3012
3013     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3014     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3015     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3016
3017     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3018     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3019     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3020
3021     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3022     if (env) useDynamicSMSG = 1;
3023     if (!useDynamicSMSG)
3024       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3025     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3026     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3027     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3028     
3029     if(myrank == 0)
3030     {
3031         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3032         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3033     }
3034 #ifdef USE_ONESIDED
3035     onesided_init(NULL, &onesided_hnd);
3036
3037     // this is a GNI test, so use the libonesided bypass functionality
3038     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3039     local_addr = gniGetNicAddress();
3040 #else
3041     ptag = get_ptag();
3042     cookie = get_cookie();
3043 #if 0
3044     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3045 #endif
3046     //Create and attach to the communication  domain */
3047     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3048     GNI_RC_CHECK("GNI_CdmCreate", status);
3049     //* device id The device id is the minor number for the device
3050     //that is assigned to the device by the system when the device is created.
3051     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3052     //where X is the device number 0 default 
3053     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3054     GNI_RC_CHECK("GNI_CdmAttach", status);
3055     local_addr = get_gni_nic_address(0);
3056 #endif
3057     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3058     _MEMCHECK(MPID_UGNI_AllAddr);
3059     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3060     /* create the local completion queue */
3061     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3062     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
3063     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3064     
3065     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
3066     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
3067     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3068
3069     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3070     GNI_RC_CHECK("Create CQ (rx)", status);
3071     
3072     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
3073     //GNI_RC_CHECK("Create Post CQ (rx)", status);
3074     
3075     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3076     //GNI_RC_CHECK("Create BTE CQ", status);
3077
3078     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3079     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3080     _MEMCHECK(ep_hndl_array);
3081 #if CMK_SMP && !COMM_THREAD_SEND
3082     tx_cq_lock  = CmiCreateLock();
3083     rx_cq_lock  = CmiCreateLock();
3084 #endif
3085     for (i=0; i<mysize; i++) {
3086         if(i == myrank) continue;
3087         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
3088         GNI_RC_CHECK("GNI_EpCreate ", status);   
3089         remote_addr = MPID_UGNI_AllAddr[i];
3090         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3091         GNI_RC_CHECK("GNI_EpBind ", status);   
3092     }
3093
3094     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3095     _init_smsg();
3096     PMI_Barrier();
3097
3098 #if     USE_LRTS_MEMPOOL
3099     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3100     if (env) {
3101         _totalmem = CmiReadSize(env);
3102         if (myrank == 0)
3103             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3104     }
3105
3106     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3107     if (env) _mempool_size = CmiReadSize(env);
3108     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3109         _mempool_size = CmiReadSize(env);
3110
3111
3112     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3113     if (env) {
3114         MAX_REG_MEM = CmiReadSize(env);
3115         user_set_flag = 1;
3116     }
3117     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3118         MAX_REG_MEM = CmiReadSize(env);
3119         user_set_flag = 1;
3120     }
3121
3122     env = getenv("CHARM_UGNI_SEND_MAX");
3123     if (env) {
3124         MAX_BUFF_SEND = CmiReadSize(env);
3125         user_set_flag = 1;
3126     }
3127     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3128         MAX_BUFF_SEND = CmiReadSize(env);
3129         user_set_flag = 1;
3130     }
3131
3132     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3133     if (env) {
3134         _mempool_size_limit = CmiReadSize(env);
3135     }
3136
3137     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3138     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3139
3140     if (myrank==0) {
3141         printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3142         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3143         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3144             /* memblock can expand to BIG_MSG * 2 size */
3145             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3146             CmiAbort("mempool maximum size is too small. \n");
3147         }
3148 #if CMK_SMP && MULTI_THREAD_SEND
3149         printf("Charm++> worker thread sending messages\n");
3150 #elif CMK_SMP && COMM_THREAD_SEND
3151         printf("Charm++> only comm thread send/recv messages\n");
3152 #endif
3153     }
3154
3155 #endif     /* end of USE_LRTS_MEMPOOL */
3156
3157     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3158     if (env) {
3159         BIG_MSG = CmiReadSize(env);
3160         if (BIG_MSG < ONE_SEG)
3161           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3162     }
3163     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3164     if (env) {
3165         BIG_MSG_PIPELINE = atoi(env);
3166     }
3167
3168     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3169     if (env) _checkProgress = 0;
3170     if (mysize == 1) _checkProgress = 0;
3171
3172     
3173     /*
3174     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3175     if (env) 
3176         _tlbpagesize = CmiReadSize(env);
3177     */
3178     /* real gethugepagesize() is only available when hugetlb module linked */
3179     _tlbpagesize = gethugepagesize();
3180     if (myrank == 0) {
3181         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3182     }
3183
3184 #if LARGEPAGE
3185     if (_tlbpagesize == 4096) {
3186         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3187     }
3188 #endif
3189
3190     /* init DMA buffer for medium message */
3191
3192     //_init_DMA_buffer();
3193     
3194     free(MPID_UGNI_AllAddr);
3195 #if CMK_SMP
3196     sendRdmaBuf = PCQueueCreate();
3197 #else
3198     sendRdmaBuf = 0;
3199 #endif
3200 #if MACHINE_DEBUG_LOG
3201     char ln[200];
3202     sprintf(ln,"debugLog.%d",myrank);
3203     debugLog=fopen(ln,"w");
3204 #endif
3205
3206 //    NTK_Init();
3207 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3208 }
3209
3210 void* LrtsAlloc(int n_bytes, int header)
3211 {
3212     void *ptr = NULL;
3213 #if 0
3214     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3215 #endif
3216     if(n_bytes <= SMSG_MAX_MSG)
3217     {
3218         int totalsize = n_bytes+header;
3219         ptr = malloc(totalsize);
3220     }
3221     else {
3222         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3223 #if     USE_LRTS_MEMPOOL
3224         n_bytes = ALIGN64(n_bytes);
3225         if(n_bytes < BIG_MSG)
3226         {
3227             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3228             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3229         }else 
3230         {
3231 #if LARGEPAGE
3232             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3233             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3234             char *res = my_get_huge_pages(n_bytes);
3235 #else
3236             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3237 #endif
3238             if (res) ptr = res + ALIGNBUF - header;
3239         }
3240 #else
3241         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3242         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3243         ptr = res + ALIGNBUF - header;
3244 #endif
3245     }
3246     return ptr;
3247 }
3248
3249 void  LrtsFree(void *msg)
3250 {
3251     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3252     if (size <= SMSG_MAX_MSG)
3253         free(msg);
3254     else {
3255         size = ALIGN64(size);
3256         if(size>=BIG_MSG)
3257         {
3258 #if LARGEPAGE
3259             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3260             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3261 #else
3262             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3263 #endif
3264         }
3265         else {
3266 #if    USE_LRTS_MEMPOOL
3267 #if CMK_SMP
3268             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3269 #else
3270             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3271 #endif
3272 #else
3273             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3274 #endif
3275         }
3276     }
3277 }
3278
3279 void LrtsExit()
3280 {
3281     /* free memory ? */
3282 #if USE_LRTS_MEMPOOL
3283     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3284     mempool_destroy(CpvAccess(mempool));
3285 #endif
3286     PMI_Finalize();
3287     exit(0);
3288 }
3289
3290 void LrtsDrainResources()
3291 {
3292     if(mysize == 1) return;
3293     while (
3294 #if CMK_USE_OOB
3295            !SendBufferMsg(&smsg_oob_queue) ||
3296 #endif
3297            !SendBufferMsg(&smsg_queue) 
3298 #if PIGGYBACK_ACK
3299         || !SendBufferMsg(&smsg_ack_queue)
3300 #endif
3301           )
3302     {
3303         if (useDynamicSMSG)
3304             PumpDatagramConnection();
3305         PumpNetworkSmsg();
3306         PumpLocalRdmaTransactions();
3307         SendRdmaMsg();
3308     }
3309     PMI_Barrier();
3310 }
3311
3312 void LrtsAbort(const char *message) {
3313     printf("CmiAbort is calling on PE:%d\n", myrank);
3314     CmiPrintStackTrace(0);
3315     PMI_Abort(-1, message);
3316 }
3317
3318 /**************************  TIMER FUNCTIONS **************************/
3319 #if CMK_TIMER_USE_SPECIAL
3320 /* MPI calls are not threadsafe, even the timer on some machines */
3321 static CmiNodeLock  timerLock = 0;
3322 static int _absoluteTime = 0;
3323 static int _is_global = 0;
3324 static struct timespec start_ts;
3325
3326 inline int CmiTimerIsSynchronized() {
3327     return 0;
3328 }
3329
3330 inline int CmiTimerAbsolute() {
3331     return _absoluteTime;
3332 }
3333
3334 double CmiStartTimer() {
3335     return 0.0;
3336 }
3337
3338 double CmiInitTime() {
3339     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3340 }
3341
3342 void CmiTimerInit(char **argv) {
3343     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3344     if (_absoluteTime && CmiMyPe() == 0)
3345         printf("Charm++> absolute  timer is used\n");
3346     
3347     _is_global = CmiTimerIsSynchronized();
3348
3349
3350     if (_is_global) {
3351         if (CmiMyRank() == 0) {
3352             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3353         }
3354     } else { /* we don't have a synchronous timer, set our own start time */
3355         CmiBarrier();
3356         CmiBarrier();
3357         CmiBarrier();
3358         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3359     }
3360     CmiNodeAllBarrier();          /* for smp */
3361 }
3362
3363 /**
3364  * Since the timerLock is never created, and is
3365  * always NULL, then all the if-condition inside
3366  * the timer functions could be disabled right
3367  * now in the case of SMP.
3368  */
3369 double CmiTimer(void) {
3370     struct timespec now_ts;
3371     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3372     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3373         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3374 }
3375
3376 double CmiWallTimer(void) {
3377     struct timespec now_ts;
3378     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3379     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3380         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3381 }
3382
3383 double CmiCpuTimer(void) {
3384     struct timespec now_ts;
3385     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3386     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3387         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3388 }
3389
3390 #endif
3391 /************Barrier Related Functions****************/
3392
3393 int CmiBarrier()
3394 {
3395     gni_return_t status;
3396
3397 #if CMK_SMP
3398     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3399     CmiNodeAllBarrier();
3400     if (CmiMyRank() == CmiMyNodeSize())
3401 #else
3402     if (CmiMyRank() == 0)
3403 #endif
3404     {
3405         /**
3406          *  The call of CmiBarrier is usually before the initialization
3407          *  of trace module of Charm++, therefore, the START_EVENT
3408          *  and END_EVENT are disabled here. -Chao Mei
3409          */
3410         /*START_EVENT();*/
3411         status = PMI_Barrier();
3412         GNI_RC_CHECK("PMI_Barrier", status);
3413         /*END_EVENT(10);*/
3414     }
3415     CmiNodeAllBarrier();
3416     return status;
3417
3418 }
3419 #if CMK_DIRECT
3420 #include "machine-cmidirect.c"
3421 #endif
3422 #if CMK_PERSISTENT_COMM
3423 #include "machine-persistent.c"
3424 #endif
3425
3426