cbc39569325acce0f792694dc872a8506013b3fe
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40
41 #include "converse.h"
42
43 #if CMK_DIRECT
44 #include "cmidirect.h"
45 #endif
46 #define PRINT_SYH  0
47
48 #define USE_LRTS_MEMPOOL                  1
49 #define REMOTE_EVENT                      0
50
51 #define CMI_EXERT_SEND_CAP      0
52 #define CMI_EXERT_RECV_CAP      0
53
54 #if CMI_EXERT_SEND_CAP
55 #define SEND_CAP 16
56 #endif
57
58 #if CMI_EXERT_RECV_CAP
59 #define RECV_CAP 2
60 #endif
61
62 #if CMK_SMP
63 #define COMM_THREAD_SEND 1
64 //#define MULTI_THREAD_SEND 1
65 #endif
66
67 // Trace communication thread
68 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
69 #define TRACE_THRESHOLD     0.00005
70 #define CMI_MPI_TRACE_MOREDETAILED 0
71 #undef CMI_MPI_TRACE_USEREVENTS
72 #define CMI_MPI_TRACE_USEREVENTS 1
73 #else
74 #undef CMK_SMP_TRACE_COMMTHREAD
75 #define CMK_SMP_TRACE_COMMTHREAD 0
76 #endif
77
78 #define CMK_TRACE_COMMOVERHEAD 0
79 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
80 #undef CMI_MPI_TRACE_USEREVENTS
81 #define CMI_MPI_TRACE_USEREVENTS 1
82 #else
83 #undef CMK_TRACE_COMMOVERHEAD
84 #define CMK_TRACE_COMMOVERHEAD 0
85 #endif
86
87 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
88 CpvStaticDeclare(double, projTraceStart);
89 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
90 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
91 #else
92 #define  START_EVENT()
93 #define  END_EVENT(x)
94 #endif
95
96 #if USE_LRTS_MEMPOOL
97
98 #define oneMB (1024ll*1024)
99 #define oneGB (1024ll*1024*1024)
100
101 static CmiInt8 _mempool_size = 8*oneMB;
102 static CmiInt8 _expand_mem =  4*oneMB;
103 static CmiInt8 _mempool_size_limit = 0;
104
105 static CmiInt8 _totalmem = 0.8*oneGB;
106
107 static int BIG_MSG  =  4*oneMB;
108 static int ONE_SEG  =  2*oneMB;
109 static int BIG_MSG_PIPELINE = 4;
110
111 // dynamic flow control
112 static CmiInt8 buffered_send_msg = 0;
113 static CmiInt8  register_memory_size = 0;
114
115 #define     LARGEPAGE           0
116 #if LARGEPAGE
117 #if CMK_SMP && COMM_THREAD_SEND 
118 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
119 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
120 #else
121 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
122 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
123 #endif
124 #else
125 #if CMK_SMP && COMM_THREAD_SEND 
126 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
127 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
128 #else
129 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
130 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
131 #endif
132
133
134 #endif
135
136 #endif     /* end USE_LRTS_MEMPOOL */
137
138 #if CMK_SMP && MULTI_THREAD_SEND
139 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
140 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
141 #else
142 #define     CMI_GNI_LOCK
143 #define     CMI_GNI_UNLOCK
144 #endif
145
146 static int _tlbpagesize = 4096;
147
148 static int   user_set_flag  = 0;
149
150 static int _checkProgress = 1;             /* check deadlock */
151 static int _detected_hang = 0;
152
153 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
154
155 // dynamic SMSG
156 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
157
158 static int avg_smsg_connection = 32;
159 static int                 *smsg_connected_flag= 0;
160 static gni_smsg_attr_t     **smsg_attr_vector_local;
161 static gni_smsg_attr_t     **smsg_attr_vector_remote;
162 static gni_ep_handle_t     ep_hndl_unbound;
163 static gni_smsg_attr_t     send_smsg_attr;
164 static gni_smsg_attr_t     recv_smsg_attr;
165
166 typedef struct _dynamic_smsg_mailbox{
167    void     *mailbox_base;
168    int      size;
169    int      offset;
170    gni_mem_handle_t  mem_hndl;
171    struct      _dynamic_smsg_mailbox  *next;
172 }dynamic_smsg_mailbox_t;
173
174 static dynamic_smsg_mailbox_t  *mailbox_list;
175
176 int         rdma_id = 0;
177
178 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
179 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
180
181 #if PRINT_SYH
182 int         lrts_send_msg_id = 0;
183 int         lrts_local_done_msg = 0;
184 int         lrts_send_rdma_success = 0;
185 #endif
186
187 #include "machine.h"
188
189 #include "pcqueue.h"
190
191 #include "mempool.h"
192
193 #if CMK_PERSISTENT_COMM
194 #include "machine-persistent.h"
195 #endif
196
197 //#define  USE_ONESIDED 1
198 #ifdef USE_ONESIDED
199 //onesided implementation is wrong, since no place to restore omdh
200 #include "onesided.h"
201 onesided_hnd_t   onesided_hnd;
202 onesided_md_t    omdh;
203 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
204
205 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
206
207 #else
208 uint8_t   onesided_hnd, omdh;
209 #if REMOTE_EVENT
210 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
211          status = GNI_RC_ERROR_NOMEM;} \
212         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
213                 if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
214 #else
215 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
216     do {   \
217         if (register_memory_size + size >= MAX_REG_MEM) { \
218             status = GNI_RC_ERROR_NOMEM; \
219         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
220             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \ 
221     } while(0)
222 #endif
223 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
224     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
225              register_memory_size -= size; \
226          else CmiAbort("MEM_DEregister");  \
227     } while (0)
228 #endif
229
230 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
231 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
232 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
233 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
234 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
235 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
236 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
237 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
238 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
239 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
240 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
241 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
242 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
243 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
244
245 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
246 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
247
248 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
249 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
250 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
251 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
252
253 #define ALIGNBUF                64
254
255 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
256 /* If SMSG is not used */
257
258 #define FMA_PER_CORE  1024
259 #define FMA_BUFFER_SIZE 1024
260
261 /* If SMSG is used */
262 static int  SMSG_MAX_MSG = 1024;
263 #define SMSG_MAX_CREDIT 72 
264
265 #define MSGQ_MAXSIZE       2048
266 /* large message transfer with FMA or BTE */
267 #define LRTS_GNI_RDMA_THRESHOLD  1024 
268
269 #if CMK_SMP
270 static int  REMOTE_QUEUE_ENTRIES=163840; 
271 static int LOCAL_QUEUE_ENTRIES=163840; 
272 #else
273 static int  REMOTE_QUEUE_ENTRIES=20480;
274 static int LOCAL_QUEUE_ENTRIES=20480; 
275 #endif
276
277 #define BIG_MSG_TAG  0x26
278 #define PUT_DONE_TAG      0x28
279 #define DIRECT_PUT_DONE_TAG      0x29
280 #define ACK_TAG           0x30
281 /* SMSG is data message */
282 #define SMALL_DATA_TAG          0x31
283 /* SMSG is a control message to initialize a BTE */
284 #define MEDIUM_HEAD_TAG         0x32
285 #define MEDIUM_DATA_TAG         0x33
286 #define LMSG_INIT_TAG           0x39 
287 #define VERY_LMSG_INIT_TAG      0x40 
288 #define VERY_LMSG_TAG           0x41 
289
290 #define DEBUG
291 #ifdef GNI_RC_CHECK
292 #undef GNI_RC_CHECK
293 #endif
294 #ifdef DEBUG
295 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
296 #else
297 #define GNI_RC_CHECK(msg,rc)
298 #endif
299
300 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
301 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
302
303 static int useStaticMSGQ = 0;
304 static int useStaticFMA = 0;
305 static int mysize, myrank;
306 gni_nic_handle_t      nic_hndl;
307
308 typedef struct {
309     gni_mem_handle_t mdh;
310     uint64_t addr;
311 } mdh_addr_t ;
312 // this is related to dynamic SMSG
313
314 typedef struct mdh_addr_list{
315     gni_mem_handle_t mdh;
316    void *addr;
317     struct mdh_addr_list *next;
318 }mdh_addr_list_t;
319
320 static unsigned int         smsg_memlen;
321 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
322 mdh_addr_t          setup_mem;
323 mdh_addr_t          *smsg_connection_vec = 0;
324 gni_mem_handle_t    smsg_connection_memhndl;
325 static int          smsg_expand_slots = 10;
326 static int          smsg_available_slot = 0;
327 static void         *smsg_mailbox_mempool = 0;
328 mdh_addr_list_t     *smsg_dynamic_list = 0;
329
330 static void             *smsg_mailbox_base;
331 gni_msgq_attr_t         msgq_attrs;
332 gni_msgq_handle_t       msgq_handle;
333 gni_msgq_ep_attr_t      msgq_ep_attrs;
334 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
335
336 /* =====Beginning of Declarations of Machine Specific Variables===== */
337 static int cookie;
338 static int modes = 0;
339 static gni_cq_handle_t       smsg_rx_cqh = NULL;
340 static gni_cq_handle_t       smsg_tx_cqh = NULL;
341 static gni_cq_handle_t       post_rx_cqh = NULL;
342 static gni_cq_handle_t       post_tx_cqh = NULL;
343 static gni_ep_handle_t       *ep_hndl_array;
344 #if CMK_SMP && MULTI_THREAD_SEND
345 static CmiNodeLock           *ep_lock_array;
346 static CmiNodeLock           tx_cq_lock; 
347 static CmiNodeLock           rx_cq_lock;
348 static CmiNodeLock           *mempool_lock;
349 #endif
350
351 typedef struct msg_list
352 {
353     uint32_t destNode;
354     uint32_t size;
355     void *msg;
356     uint8_t tag;
357 #if !CMK_SMP
358     struct msg_list *next;
359 #endif
360 }MSG_LIST;
361
362
363 typedef struct control_msg
364 {
365     uint64_t            source_addr;    /* address from the start of buffer  */
366     uint64_t            dest_addr;      /* address from the start of buffer */
367     int                 total_length;   /* total length */
368     int                 length;         /* length of this packet */
369     uint8_t             seq_id;         //big message   0 meaning single message
370     gni_mem_handle_t    source_mem_hndl;
371     struct control_msg *next;
372 } CONTROL_MSG;
373
374 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
375
376 typedef struct ack_msg
377 {
378     uint64_t            source_addr;    /* address from the start of buffer  */
379 #if ! USE_LRTS_MEMPOOL
380     gni_mem_handle_t    source_mem_hndl;
381     int                 length;          /* total length */
382 #endif
383     struct ack_msg     *next;
384 } ACK_MSG;
385
386 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
387
388 #if CMK_DIRECT
389 typedef struct{
390     uint64_t    handler_addr;
391 }CMK_DIRECT_HEADER;
392
393 typedef struct {
394     char core[CmiMsgHeaderSizeBytes];
395     uint64_t handler;
396 }cmidirectMsg;
397
398 //SYH
399 CpvDeclare(int, CmiHandleDirectIdx);
400 void CmiHandleDirectMsg(cmidirectMsg* msg)
401 {
402
403     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
404    (*(_handle->callbackFnPtr))(_handle->callbackData);
405    CmiFree(msg);
406 }
407
408 void CmiDirectInit()
409 {
410     CpvInitialize(int,  CmiHandleDirectIdx);
411     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
412 }
413
414 #endif
415 typedef struct  rmda_msg
416 {
417     int                   destNode;
418     gni_post_descriptor_t *pd;
419 #if !CMK_SMP
420     struct  rmda_msg      *next;
421 #endif
422 }RDMA_REQUEST;
423
424
425 #if CMK_SMP
426 #define SMP_LOCKS               0
427 #define ONE_SEND_QUEUE                  0
428 PCQueue sendRdmaBuf;
429 typedef struct  msg_list_index
430 {
431     PCQueue     sendSmsgBuf;
432     int         pushed;
433     CmiNodeLock   lock;
434 } MSG_LIST_INDEX;
435 char                *destpe_avail;
436 #if  !ONE_SEND_QUEUE && SMP_LOCKS
437     PCQueue     nonEmptyQueues;
438 #endif
439 #else         /* non-smp */
440
441 static RDMA_REQUEST        *sendRdmaBuf = 0;
442 static RDMA_REQUEST        *sendRdmaTail = 0;
443 typedef struct  msg_list_index
444 {
445     int         next;
446     MSG_LIST    *sendSmsgBuf;
447     MSG_LIST    *tail;
448 } MSG_LIST_INDEX;
449
450 #endif
451
452 // buffered send queue
453 #if ! ONE_SEND_QUEUE
454 typedef struct smsg_queue
455 {
456     MSG_LIST_INDEX   *smsg_msglist_index;
457     int               smsg_head_index;
458 } SMSG_QUEUE;
459 #else
460 typedef struct smsg_queue
461 {
462     PCQueue       sendMsgBuf;
463 }  SMSG_QUEUE;
464 #endif
465
466 SMSG_QUEUE                  smsg_queue;
467 SMSG_QUEUE                  smsg_oob_queue;
468
469 #if CMK_SMP
470
471 #define FreeMsgList(d)   free(d);
472 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
473
474 #else
475
476 static MSG_LIST       *msglist_freelist=0;
477
478 #define FreeMsgList(d)  \
479   do { \
480   (d)->next = msglist_freelist;\
481   msglist_freelist = d; \
482   } while (0)
483
484 #define MallocMsgList(d) \
485   do {  \
486   d = msglist_freelist;\
487   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
488              _MEMCHECK(d);\
489   } else msglist_freelist = d->next; \
490   d->next =0;  \
491   } while (0)
492
493 #endif
494
495 #if CMK_SMP
496
497 #define FreeControlMsg(d)      free(d);
498 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
499
500 #else
501
502 static CONTROL_MSG    *control_freelist=0;
503
504 #define FreeControlMsg(d)       \
505   do { \
506   (d)->next = control_freelist;\
507   control_freelist = d; \
508   } while (0);
509
510 #define MallocControlMsg(d) \
511   do {  \
512   d = control_freelist;\
513   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
514              _MEMCHECK(d);\
515   } else control_freelist = d->next;  \
516   } while (0);
517
518 #endif
519
520 #if CMK_SMP
521
522 #define FreeAckMsg(d)      free(d);
523 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
524
525 #else
526
527 static ACK_MSG        *ack_freelist=0;
528
529 #define FreeAckMsg(d)       \
530   do { \
531   (d)->next = ack_freelist;\
532   ack_freelist = d; \
533   } while (0)
534
535 #define MallocAckMsg(d) \
536   do { \
537   d = ack_freelist;\
538   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
539              _MEMCHECK(d);\
540   } else ack_freelist = d->next; \
541   } while (0)
542
543 #endif
544
545
546 # if CMK_SMP
547 #define FreeRdmaRequest(d)       free(d);
548 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
549 #else
550
551 static RDMA_REQUEST         *rdma_freelist = NULL;
552
553 #define FreeRdmaRequest(d)       \
554   do {  \
555   (d)->next = rdma_freelist;\
556   rdma_freelist = d;    \
557   } while (0)
558
559 #define MallocRdmaRequest(d) \
560   do {   \
561   d = rdma_freelist;\
562   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
563              _MEMCHECK(d);\
564   } else rdma_freelist = d->next; \
565   d->next =0;   \
566   } while (0)
567 #endif
568
569 /* reuse gni_post_descriptor_t */
570 static gni_post_descriptor_t *post_freelist=0;
571
572 #if  !CMK_SMP
573 #define FreePostDesc(d)       \
574     (d)->next_descr = post_freelist;\
575     post_freelist = d;
576
577 #define MallocPostDesc(d) \
578   d = post_freelist;\
579   if (d==0) { \
580      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
581      d->next_descr = 0;\
582       _MEMCHECK(d);\
583   } else post_freelist = d->next_descr;
584 #else
585
586 #define FreePostDesc(d)     free(d);
587 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
588
589 #endif
590
591
592 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
593 static int      buffered_smsg_counter = 0;
594
595 /* SmsgSend return success but message sent is not confirmed by remote side */
596 static MSG_LIST *buffered_fma_head = 0;
597 static MSG_LIST *buffered_fma_tail = 0;
598
599 /* functions  */
600 #define IsFree(a,ind)  !( a& (1<<(ind) ))
601 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
602 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
603
604 CpvDeclare(mempool_type*, mempool);
605
606 /* get the upper bound of log 2 */
607 int mylog2(int size)
608 {
609     int op = size;
610     unsigned int ret=0;
611     unsigned int mask = 0;
612     int i;
613     while(op>0)
614     {
615         op = op >> 1;
616         ret++;
617
618     }
619     for(i=1; i<ret; i++)
620     {
621         mask = mask << 1;
622         mask +=1;
623     }
624
625     ret -= ((size &mask) ? 0:1);
626     return ret;
627 }
628
629 static void
630 allgather(void *in,void *out, int len)
631 {
632     static int *ivec_ptr=NULL,already_called=0,job_size=0;
633     int i,rc;
634     int my_rank;
635     char *tmp_buf,*out_ptr;
636
637     if(!already_called) {
638
639         rc = PMI_Get_size(&job_size);
640         CmiAssert(rc == PMI_SUCCESS);
641         rc = PMI_Get_rank(&my_rank);
642         CmiAssert(rc == PMI_SUCCESS);
643
644         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
645         CmiAssert(ivec_ptr != NULL);
646
647         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
648         CmiAssert(rc == PMI_SUCCESS);
649
650         already_called = 1;
651
652     }
653
654     tmp_buf = (char *)malloc(job_size * len);
655     CmiAssert(tmp_buf);
656
657     rc = PMI_Allgather(in,tmp_buf,len);
658     CmiAssert(rc == PMI_SUCCESS);
659
660     out_ptr = out;
661
662     for(i=0;i<job_size;i++) {
663
664         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
665
666     }
667
668     free(tmp_buf);
669 }
670
671 static void
672 allgather_2(void *in,void *out, int len)
673 {
674     //PMI_Allgather is out of order
675     int i,rc, extend_len;
676     int  rank_index;
677     char *out_ptr, *out_ref;
678     char *in2;
679
680     extend_len = sizeof(int) + len;
681     in2 = (char*)malloc(extend_len);
682
683     memcpy(in2, &myrank, sizeof(int));
684     memcpy(in2+sizeof(int), in, len);
685
686     out_ptr = (char*)malloc(mysize*extend_len);
687
688     rc = PMI_Allgather(in2, out_ptr, extend_len);
689     GNI_RC_CHECK("allgather", rc);
690
691     out_ref = out;
692
693     for(i=0;i<mysize;i++) {
694         //rank index 
695         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
696         //copy to the rank index slot
697         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
698     }
699
700     free(out_ptr);
701     free(in2);
702
703 }
704
705 static unsigned int get_gni_nic_address(int device_id)
706 {
707     unsigned int address, cpu_id;
708     gni_return_t status;
709     int i, alps_dev_id=-1,alps_address=-1;
710     char *token, *p_ptr;
711
712     p_ptr = getenv("PMI_GNI_DEV_ID");
713     if (!p_ptr) {
714         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
715        
716         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
717     } else {
718         while ((token = strtok(p_ptr,":")) != NULL) {
719             alps_dev_id = atoi(token);
720             if (alps_dev_id == device_id) {
721                 break;
722             }
723             p_ptr = NULL;
724         }
725         CmiAssert(alps_dev_id != -1);
726         p_ptr = getenv("PMI_GNI_LOC_ADDR");
727         CmiAssert(p_ptr != NULL);
728         i = 0;
729         while ((token = strtok(p_ptr,":")) != NULL) {
730             if (i == alps_dev_id) {
731                 alps_address = atoi(token);
732                 break;
733             }
734             p_ptr = NULL;
735             ++i;
736         }
737         CmiAssert(alps_address != -1);
738         address = alps_address;
739     }
740     return address;
741 }
742
743 static uint8_t get_ptag(void)
744 {
745     char *p_ptr, *token;
746     uint8_t ptag;
747
748     p_ptr = getenv("PMI_GNI_PTAG");
749     CmiAssert(p_ptr != NULL);
750     token = strtok(p_ptr, ":");
751     ptag = (uint8_t)atoi(token);
752     return ptag;
753         
754 }
755
756 static uint32_t get_cookie(void)
757 {
758     uint32_t cookie;
759     char *p_ptr, *token;
760
761     p_ptr = getenv("PMI_GNI_COOKIE");
762     CmiAssert(p_ptr != NULL);
763     token = strtok(p_ptr, ":");
764     cookie = (uint32_t)atoi(token);
765
766     return cookie;
767 }
768
769 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
770 /* TODO: add any that are related */
771 /* =====End of Definitions of Message-Corruption Related Macros=====*/
772
773
774 #include "machine-lrts.h"
775 #include "machine-common-core.c"
776
777 /* Network progress function is used to poll the network when for
778    messages. This flushes receive buffers on some  implementations*/
779 #if CMK_MACHINE_PROGRESS_DEFINED
780 void CmiMachineProgressImpl() {
781 }
782 #endif
783
784 static void SendRdmaMsg();
785 static void PumpNetworkSmsg();
786 static void PumpLocalRdmaTransactions();
787 static int SendBufferMsg(SMSG_QUEUE *queue);
788
789 #if MACHINE_DEBUG_LOG
790 FILE *debugLog = NULL;
791 static CmiInt8 buffered_recv_msg = 0;
792 int         lrts_smsg_success = 0;
793 int         lrts_received_msg = 0;
794 #endif
795
796 static void sweep_mempool(mempool_type *mptr)
797 {
798     block_header *current = &(mptr->block_head);
799
800     printf("[n %d] sweep_mempool slot START.\n", myrank);
801     while( current!= NULL) {
802         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
803         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
804     }
805     printf("[n %d] sweep_mempool slot END.\n", myrank);
806 }
807
808 inline
809 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
810 {
811     block_header *current = *from;
812
813     //while(register_memory_size>= MAX_REG_MEM)
814     //{
815         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
816             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
817
818         *from = current;
819         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
820         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
821         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
822     //}
823     return GNI_RC_SUCCESS;
824 }
825
826 inline 
827 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
828 {
829     gni_return_t status = GNI_RC_SUCCESS;
830     //int size = GetMempoolsize(msg);
831     //void *blockaddr = GetMempoolBlockPtr(msg);
832     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
833    
834     block_header *current = &(mptr->block_head);
835     while(register_memory_size>= MAX_REG_MEM)
836     {
837         status = deregisterMemory(mptr, &current);
838         if (status != GNI_RC_SUCCESS) break;
839     }
840     if(register_memory_size>= MAX_REG_MEM) return status;
841
842     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
843     while(1)
844     {
845         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
846         if(status == GNI_RC_SUCCESS)
847         {
848             break;
849         }
850         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
851         {
852             CmiAbort("Memory registor for mempool fails\n");
853         }
854         else
855         {
856             status = deregisterMemory(mptr, &current);
857             if (status != GNI_RC_SUCCESS) break;
858         }
859     }; 
860     return status;
861 }
862
863 inline 
864 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
865 {
866     static int rank = -1;
867     int i;
868     gni_return_t status;
869     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
870     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
871     mempool_type *mptr;
872
873     status = registerFromMempool(mptr1, msg, size, t);
874     if (status == GNI_RC_SUCCESS) return status;
875 #if CMK_SMP 
876     for (i=0; i<CmiMyNodeSize()+1; i++) {
877       rank = (rank+1)%(CmiMyNodeSize()+1);
878       mptr = CpvAccessOther(mempool, rank);
879       if (mptr == mptr1) continue;
880       status = registerFromMempool(mptr, msg, size, t);
881       if (status == GNI_RC_SUCCESS) return status;
882     }
883 #endif
884     return  GNI_RC_ERROR_RESOURCE;
885 }
886
887 inline
888 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
889 {
890     MSG_LIST        *msg_tmp;
891     MallocMsgList(msg_tmp);
892     msg_tmp->destNode = destNode;
893     msg_tmp->size   = size;
894     msg_tmp->msg    = msg;
895     msg_tmp->tag    = tag;
896
897 #if !CMK_SMP
898     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
899         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
900         queue->smsg_head_index = destNode;
901         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
902     }else
903     {
904         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
905         queue->smsg_msglist_index[destNode].tail = msg_tmp;
906     }
907 #else
908 #if ONE_SEND_QUEUE
909     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
910 #else
911 #if SMP_LOCKS
912     CmiLock(queue->smsg_msglist_index[destNode].lock);
913     if(queue->smsg_msglist_index[destNode].pushed == 0)
914     {
915         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
916     }
917     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
918     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
919 #else
920     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
921 #endif
922 #endif
923 #endif
924 #if PRINT_SYH
925     buffered_smsg_counter++;
926 #endif
927 }
928
929 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
930 {
931     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
932 }
933
934 inline
935 static void setup_smsg_connection(int destNode)
936 {
937     mdh_addr_list_t  *new_entry = 0;
938     gni_post_descriptor_t *pd;
939     gni_smsg_attr_t      *smsg_attr;
940     gni_return_t status = GNI_RC_NOT_DONE;
941     RDMA_REQUEST        *rdma_request_msg;
942     
943     if(smsg_available_slot == smsg_expand_slots)
944     {
945         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
946         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
947         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
948
949         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
950             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
951             GNI_MEM_READWRITE,   
952             -1,
953             &(new_entry->mdh));
954         smsg_available_slot = 0; 
955         new_entry->next = smsg_dynamic_list;
956         smsg_dynamic_list = new_entry;
957     }
958     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
959     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
960     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
961     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
962     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
963     smsg_attr->buff_size = smsg_memlen;
964     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
965     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
966     smsg_local_attr_vec[destNode] = smsg_attr;
967     smsg_available_slot++;
968     MallocPostDesc(pd);
969     pd->type            = GNI_POST_FMA_PUT;
970     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
971     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
972     pd->length          = sizeof(gni_smsg_attr_t);
973     pd->local_addr      = (uint64_t) smsg_attr;
974     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
975     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
976     pd->src_cq_hndl     = 0;
977     pd->rdma_mode       = 0;
978     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
979     print_smsg_attr(smsg_attr);
980     if(status == GNI_RC_ERROR_RESOURCE )
981     {
982         MallocRdmaRequest(rdma_request_msg);
983         rdma_request_msg->destNode = destNode;
984         rdma_request_msg->pd = pd;
985         /* buffer this request */
986     }
987 #if PRINT_SYH
988     if(status != GNI_RC_SUCCESS)
989        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
990     else
991         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
992 #endif
993 }
994
995 /* useDynamicSMSG */
996 inline 
997 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
998 {
999     gni_return_t status = GNI_RC_NOT_DONE;
1000
1001     if(mailbox_list->offset == mailbox_list->size)
1002     {
1003         dynamic_smsg_mailbox_t *new_mailbox_entry;
1004         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1005         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1006         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1007         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1008         new_mailbox_entry->offset = 0;
1009         
1010         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1011             new_mailbox_entry->size, smsg_rx_cqh,
1012             GNI_MEM_READWRITE,   
1013             -1,
1014             &(new_mailbox_entry->mem_hndl));
1015
1016         GNI_RC_CHECK("register", status);
1017         new_mailbox_entry->next = mailbox_list;
1018         mailbox_list = new_mailbox_entry;
1019     }
1020     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1021     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1022     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1023     local_smsg_attr->mbox_offset = mailbox_list->offset;
1024     mailbox_list->offset += smsg_memlen;
1025     local_smsg_attr->buff_size = smsg_memlen;
1026     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1027     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1028 }
1029
1030 /* useDynamicSMSG */
1031 inline 
1032 static int connect_to(int destNode)
1033 {
1034     gni_return_t status = GNI_RC_NOT_DONE;
1035     CmiAssert(smsg_connected_flag[destNode] == 0);
1036     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1037     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1038     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1039     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1040     
1041     CMI_GNI_LOCK
1042     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1043     CMI_GNI_UNLOCK
1044     if (status == GNI_RC_ERROR_RESOURCE) {
1045       /* possibly destNode is making connection at the same time */
1046       free(smsg_attr_vector_local[destNode]);
1047       smsg_attr_vector_local[destNode] = NULL;
1048       free(smsg_attr_vector_remote[destNode]);
1049       smsg_attr_vector_remote[destNode] = NULL;
1050       mailbox_list->offset -= smsg_memlen;
1051       return 0;
1052     }
1053     GNI_RC_CHECK("GNI_Post", status);
1054     smsg_connected_flag[destNode] = 1;
1055     return 1;
1056 }
1057
1058 inline 
1059 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1060 {
1061     unsigned int          remote_address;
1062     uint32_t              remote_id;
1063     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1064     gni_smsg_attr_t       *smsg_attr;
1065     gni_post_descriptor_t *pd;
1066     gni_post_state_t      post_state;
1067     char                  *real_data; 
1068
1069     if (useDynamicSMSG) {
1070         switch (smsg_connected_flag[destNode]) {
1071         case 0: 
1072             connect_to(destNode);         /* continue to case 1 */
1073         case 1:                           /* pending connection, do nothing */
1074             status = GNI_RC_NOT_DONE;
1075             if(inbuff ==0)
1076                 buffer_small_msgs(queue, msg, size, destNode, tag);
1077             return status;
1078         }
1079     }
1080 #if CMK_SMP
1081 #if ! ONE_SEND_QUEUE
1082     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1083 #endif
1084     {
1085 #else
1086     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1087     {
1088 #endif
1089         CMI_GNI_LOCK
1090         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
1091         CMI_GNI_UNLOCK
1092         if(status == GNI_RC_SUCCESS)
1093         {
1094 #if CMK_SMP_TRACE_COMMTHREAD
1095             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1096             { 
1097                 START_EVENT();
1098                 if ( tag == SMALL_DATA_TAG)
1099                     real_data = (char*)msg; 
1100                 else 
1101                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1102                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1103             }
1104 #endif
1105             smsg_send_count ++;
1106             return status;
1107         }else
1108             status = GNI_RC_ERROR_RESOURCE;
1109     }
1110     if(inbuff ==0)
1111         buffer_small_msgs(queue, msg, size, destNode, tag);
1112     return status;
1113 }
1114
1115 inline 
1116 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1117 {
1118     /* construct a control message and send */
1119     CONTROL_MSG         *control_msg_tmp;
1120     MallocControlMsg(control_msg_tmp);
1121     control_msg_tmp->source_addr = (uint64_t)msg;
1122     control_msg_tmp->seq_id    = seqno;
1123     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1124 #if     USE_LRTS_MEMPOOL
1125     if(size < BIG_MSG)
1126     {
1127         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1128     }
1129     else
1130     {
1131         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1132         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1133         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1134     }
1135 #else
1136     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1137 #endif
1138     return control_msg_tmp;
1139 }
1140
1141 #define BLOCKING_SEND_CONTROL    0
1142
1143 // Large message, send control to receiver, receiver register memory and do a GET, 
1144 // return 1 - send no success
1145 inline
1146 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1147 {
1148     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1149     uint32_t            vmdh_index  = -1;
1150     int                 size;
1151     int                 offset = 0;
1152     uint64_t            source_addr;
1153     int                 register_size; 
1154     void                *msg;
1155
1156     size    =   control_msg_tmp->total_length;
1157     source_addr = control_msg_tmp->source_addr;
1158     register_size = control_msg_tmp->length;
1159
1160 #if  USE_LRTS_MEMPOOL
1161     if( control_msg_tmp->seq_id == 0 ){
1162 #if BLOCKING_SEND_CONTROL
1163         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1164             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1165                 LrtsAdvanceCommunication(0);
1166         }
1167 #endif
1168         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1169         {
1170             msg = (void*)source_addr;
1171             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1172             {
1173                 if(!inbuff)
1174                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1175                 return GNI_RC_ERROR_NOMEM;
1176             }
1177             //register the corresponding mempool
1178             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1179             if(status == GNI_RC_SUCCESS)
1180             {
1181                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1182             }
1183         }else
1184         {
1185             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1186             status = GNI_RC_SUCCESS;
1187         }
1188         if(NoMsgInSend( control_msg_tmp->source_addr))
1189             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1190         else
1191             register_size = 0;
1192     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1193     {
1194         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1195         source_addr += offset;
1196         size = control_msg_tmp->length;
1197 #if BLOCKING_SEND_CONTROL
1198         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1199             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1200                 LrtsAdvanceCommunication(0);
1201         }
1202 #endif
1203         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1204             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1205             {
1206                 if(!inbuff)
1207                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1208                 return GNI_RC_ERROR_NOMEM;
1209             }
1210             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1211             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1212         }
1213         else
1214         {
1215             status = GNI_RC_SUCCESS;
1216         }
1217         register_size = 0;  
1218     }
1219
1220     if(status == GNI_RC_SUCCESS)
1221     {
1222         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1223         if(status == GNI_RC_SUCCESS)
1224         {
1225             buffered_send_msg += register_size;
1226             if(control_msg_tmp->seq_id == 0)
1227             {
1228                 IncreaseMsgInSend(source_addr);
1229             }
1230             FreeControlMsg(control_msg_tmp);
1231             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1232         }else
1233             status = GNI_RC_ERROR_RESOURCE;
1234
1235     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1236     {
1237         CmiAbort("Memory registor for large msg\n");
1238     }else 
1239     {
1240         status = GNI_RC_ERROR_NOMEM; 
1241         if(!inbuff)
1242             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1243     }
1244     return status;
1245 #else
1246     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1247     if(status == GNI_RC_SUCCESS)
1248     {
1249         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1250         if(status == GNI_RC_SUCCESS)
1251         {
1252             FreeControlMsg(control_msg_tmp);
1253         }
1254     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1255     {
1256         CmiAbort("Memory registor for large msg\n");
1257     }else 
1258     {
1259         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1260     }
1261     return status;
1262 #endif
1263 }
1264
1265 inline void LrtsPrepareEnvelope(char *msg, int size)
1266 {
1267     CmiSetMsgSize(msg, size);
1268 }
1269
1270 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1271 {
1272     gni_return_t        status  =   GNI_RC_SUCCESS;
1273     uint8_t tag;
1274     CONTROL_MSG         *control_msg_tmp;
1275     int                 oob = ( mode & OUT_OF_BAND);
1276     SMSG_QUEUE          *queue;
1277
1278     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1279 #if CMK_USE_OOB
1280     queue = oob? &smsg_oob_queue : &smsg_queue;
1281 #else
1282     queue = &smsg_queue;
1283 #endif
1284
1285     LrtsPrepareEnvelope(msg, size);
1286
1287 #if PRINT_SYH
1288     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1289 #endif 
1290 #if CMK_SMP && COMM_THREAD_SEND
1291     if(size <= SMSG_MAX_MSG)
1292         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1293     else if (size < BIG_MSG) {
1294         control_msg_tmp =  construct_control_msg(size, msg, 0);
1295         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1296     }
1297     else {
1298           CmiSetMsgSeq(msg, 0);
1299           control_msg_tmp =  construct_control_msg(size, msg, 1);
1300           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1301     }
1302 #else   //non-smp, smp(worker sending)
1303     if(size <= SMSG_MAX_MSG)
1304     {
1305         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1306             CmiFree(msg);
1307     }
1308     else if (size < BIG_MSG) {
1309         control_msg_tmp =  construct_control_msg(size, msg, 0);
1310         send_large_messages(queue, destNode, control_msg_tmp, 0);
1311     }
1312     else {
1313 #if     USE_LRTS_MEMPOOL
1314         CmiSetMsgSeq(msg, 0);
1315         control_msg_tmp =  construct_control_msg(size, msg, 1);
1316         send_large_messages(queue, destNode, control_msg_tmp, 0);
1317 #else
1318         control_msg_tmp =  construct_control_msg(size, msg, 0);
1319         send_large_messages(queue, destNode, control_msg_tmp, 0);
1320 #endif
1321     }
1322 #endif
1323     return 0;
1324 }
1325
1326 static void    PumpDatagramConnection();
1327 static void registerUserTraceEvents() {
1328 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1329     traceRegisterUserEvent("setting up connections", 10);
1330     traceRegisterUserEvent("Receiving small msgs", 20);
1331     traceRegisterUserEvent("Release local transaction", 30);
1332     traceRegisterUserEvent("Sending buffered small msgs", 40);
1333     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1334 #endif
1335 }
1336
1337 static void ProcessDeadlock()
1338 {
1339     static CmiUInt8 *ptr = NULL;
1340     static CmiUInt8  last = 0, mysum, sum;
1341     static int count = 0;
1342     gni_return_t status;
1343     int i;
1344
1345 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1346 //sweep_mempool(CpvAccess(mempool));
1347     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1348     mysum = smsg_send_count + smsg_recv_count;
1349     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1350     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1351     GNI_RC_CHECK("PMI_Allgather", status);
1352     sum = 0;
1353     for (i=0; i<mysize; i++)  sum+= ptr[i];
1354     if (last == 0 || sum == last) 
1355         count++;
1356     else
1357         count = 0;
1358     last = sum;
1359     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1360     if (count == 2) { 
1361         /* detected twice, it is a real deadlock */
1362         if (myrank == 0)  {
1363             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1364             CmiAbort("Fatal> Deadlock detected.");
1365         }
1366
1367     }
1368     _detected_hang = 0;
1369 }
1370
1371 static void CheckProgress()
1372 {
1373     if (smsg_send_count == last_smsg_send_count &&
1374         smsg_recv_count == last_smsg_recv_count ) 
1375     {
1376         _detected_hang = 1;
1377 #if !CMK_SMP
1378         if (_detected_hang) ProcessDeadlock();
1379 #endif
1380
1381     }
1382     else {
1383         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1384         last_smsg_send_count = smsg_send_count;
1385         last_smsg_recv_count = smsg_recv_count;
1386         _detected_hang = 0;
1387     }
1388 }
1389
1390 static void set_limit()
1391 {
1392     //if (!user_set_flag && CmiMyRank() == 0) {
1393     if (CmiMyRank() == 0) {
1394         int mynode = CmiPhysicalNodeID(CmiMyPe());
1395         int numpes = CmiNumPesOnPhysicalNode(mynode);
1396         int numprocesses = numpes / CmiMyNodeSize();
1397         MAX_REG_MEM  = _totalmem / numprocesses;
1398         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1399         if (CmiMyPe() == 0)
1400            printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1401     }
1402 }
1403
1404 void LrtsPostCommonInit(int everReturn)
1405 {
1406 #if CMK_DIRECT
1407     CmiDirectInit();
1408 #endif
1409 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1410     CpvInitialize(double, projTraceStart);
1411     /* only PE 0 needs to care about registration (to generate sts file). */
1412     if (CmiMyPe() == 0) {
1413         registerMachineUserEventsFunction(&registerUserTraceEvents);
1414     }
1415 #endif
1416
1417 #if CMK_SMP
1418     CmiIdleState *s=CmiNotifyGetState();
1419     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1420     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1421 #else
1422     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1423     if (useDynamicSMSG)
1424     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1425 #endif
1426
1427     if (_checkProgress)
1428 #if CMK_SMP
1429     if (CmiMyRank() == 0)
1430 #endif
1431     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1432 #if !LARGEPAGE
1433     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1434 #endif
1435 }
1436
1437 /* this is called by worker thread */
1438 void LrtsPostNonLocal(){
1439 #if CMK_SMP
1440 #if MULTI_THREAD_SEND
1441     if(mysize == 1) return;
1442     PumpLocalRdmaTransactions();
1443 #if CMK_USE_OOB
1444     if (SendBufferMsg(&smsg_oob_queue) == 1)
1445 #endif
1446     SendBufferMsg(&smsg_queue);
1447     SendRdmaMsg();
1448 #endif
1449 #endif
1450 }
1451
1452 /* useDynamicSMSG */
1453 static void    PumpDatagramConnection()
1454 {
1455     uint32_t          remote_address;
1456     uint32_t          remote_id;
1457     gni_return_t status;
1458     gni_post_state_t  post_state;
1459     uint64_t          datagram_id;
1460     int i;
1461
1462    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1463    {
1464        if (datagram_id >= mysize) {           /* bound endpoint */
1465            int pe = datagram_id - mysize;
1466            CMI_GNI_LOCK
1467            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1468            CMI_GNI_UNLOCK
1469            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1470            {
1471                CmiAssert(remote_id == pe);
1472                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1473                GNI_RC_CHECK("Dynamic SMSG Init", status);
1474 #if PRINT_SYH
1475                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1476 #endif
1477                CmiAssert(smsg_connected_flag[pe] == 1);
1478                smsg_connected_flag[pe] = 2;
1479            }
1480        }
1481        else {         /* unbound ep */
1482            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1483            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1484            {
1485                CmiAssert(remote_id<mysize);
1486                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1487                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1488                GNI_RC_CHECK("Dynamic SMSG Init", status);
1489 #if PRINT_SYH
1490                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1491 #endif
1492                smsg_connected_flag[remote_id] = 2;
1493
1494                alloc_smsg_attr(&send_smsg_attr);
1495                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1496                GNI_RC_CHECK("post unbound datagram", status);
1497            }
1498        }
1499    }
1500 }
1501
1502 /* pooling CQ to receive network message */
1503 static void PumpNetworkRdmaMsgs()
1504 {
1505     gni_cq_entry_t      event_data;
1506     gni_return_t        status;
1507
1508 }
1509
1510 inline 
1511 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1512 {
1513     RDMA_REQUEST        *rdma_request_msg;
1514     MallocRdmaRequest(rdma_request_msg);
1515     rdma_request_msg->destNode = inst_id;
1516     rdma_request_msg->pd = pd;
1517 #if CMK_SMP
1518     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1519 #else
1520     if(sendRdmaBuf == 0)
1521     {
1522         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1523     }else{
1524         sendRdmaTail->next = rdma_request_msg;
1525         sendRdmaTail =  rdma_request_msg;
1526     }
1527 #endif
1528
1529 }
1530 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1531 static void PumpNetworkSmsg()
1532 {
1533     uint64_t            inst_id;
1534     int                 ret;
1535     gni_cq_entry_t      event_data;
1536     gni_return_t        status, status2;
1537     void                *header;
1538     uint8_t             msg_tag;
1539     int                 msg_nbytes;
1540     void                *msg_data;
1541     gni_mem_handle_t    msg_mem_hndl;
1542     gni_smsg_attr_t     *smsg_attr;
1543     gni_smsg_attr_t     *remote_smsg_attr;
1544     int                 init_flag;
1545     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1546     uint64_t            source_addr;
1547     SMSG_QUEUE         *queue = &smsg_queue;
1548 #if     CMK_DIRECT
1549     cmidirectMsg        *direct_msg;
1550 #endif
1551     while(1)
1552     {
1553         CMI_GNI_LOCK
1554         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1555         CMI_GNI_UNLOCK
1556         if(status != GNI_RC_SUCCESS)
1557             break;
1558         inst_id = GNI_CQ_GET_INST_ID(event_data);
1559         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1560 #if PRINT_SYH
1561         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1562 #endif
1563         if (useDynamicSMSG) {
1564             /* subtle: smsg may come before connection is setup */
1565             while (smsg_connected_flag[inst_id] != 2) 
1566                PumpDatagramConnection();
1567         }
1568         msg_tag = GNI_SMSG_ANY_TAG;
1569         while(1) {
1570             CMI_GNI_LOCK
1571             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1572             CMI_GNI_UNLOCK
1573             if (status != GNI_RC_SUCCESS)
1574                 break;
1575 #if PRINT_SYH
1576             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1577 #endif
1578             /* copy msg out and then put into queue (small message) */
1579             switch (msg_tag) {
1580             case SMALL_DATA_TAG:
1581             {
1582                 START_EVENT();
1583                 msg_nbytes = CmiGetMsgSize(header);
1584                 msg_data    = CmiAlloc(msg_nbytes);
1585                 memcpy(msg_data, (char*)header, msg_nbytes);
1586 #if CMK_SMP_TRACE_COMMTHREAD
1587                 TRACE_COMM_RECV(CpvAccess(projTraceStart), msg_data);
1588 #endif
1589                 handleOneRecvedMsg(msg_nbytes, msg_data);
1590                 break;
1591             }
1592             case LMSG_INIT_TAG:
1593             {
1594                 getLargeMsgRequest(header, inst_id);
1595                 break;
1596             }
1597             case ACK_TAG:   //msg fit into mempool
1598             {
1599                 /* Get is done, release message . Now put is not used yet*/
1600                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1601 #if ! USE_LRTS_MEMPOOL
1602                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1603 #else
1604                 DecreaseMsgInSend(msg);
1605 #endif
1606                 if(NoMsgInSend(msg))
1607                     buffered_send_msg -= GetMempoolsize(msg);
1608                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1609                 CmiFree(msg);
1610                 break;
1611             }
1612             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1613             {
1614                 header_tmp = (CONTROL_MSG *) header;
1615                 void *msg = (void*)(header_tmp->source_addr);
1616                 int cur_seq = CmiGetMsgSeq(msg);
1617                 int offset = ONE_SEG*(cur_seq+1);
1618                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1619                 buffered_send_msg -= header_tmp->length;
1620                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1621                 if (remain_size < 0) remain_size = 0;
1622                 CmiSetMsgSize(msg, remain_size);
1623                 if(remain_size <= 0) //transaction done
1624                 {
1625                     CmiFree(msg);
1626                 }else if (header_tmp->total_length > offset)
1627                 {
1628                     CmiSetMsgSeq(msg, cur_seq+1);
1629                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1630                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1631                     //send next seg
1632                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1633                          // pipelining
1634                     if (header_tmp->seq_id == 1) {
1635                       int i;
1636                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1637                         int seq = cur_seq+i+2;
1638                         CmiSetMsgSeq(msg, seq-1);
1639                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1640                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1641                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1642                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1643                       }
1644                     }
1645                 }
1646                 break;
1647             }
1648 #if CMK_PERSISTENT_COMM
1649             case PUT_DONE_TAG: //persistent message
1650                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1651                 int size = ((CONTROL_MSG *) header)->length;
1652                 CmiReference(msg);
1653                 handleOneRecvedMsg(size, msg); 
1654                 break;
1655 #endif
1656 #if CMK_DIRECT
1657             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1658                 //create a trigger message
1659                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1660                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1661                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1662                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1663                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1664                 break;
1665 #endif
1666             default: {
1667                 printf("weird tag problem\n");
1668                 CmiAbort("Unknown tag\n");
1669                      }
1670             }               // end switch
1671 #if PRINT_SYH
1672             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1673 #endif
1674             CMI_GNI_LOCK
1675             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1676             CMI_GNI_UNLOCK
1677             smsg_recv_count ++;
1678             msg_tag = GNI_SMSG_ANY_TAG;
1679         } //endwhile getNext
1680     }   //end while GetEvent
1681     if(status == GNI_RC_ERROR_RESOURCE)
1682     {
1683         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1684         GNI_RC_CHECK("Smsg_rx_cq full", status);
1685     }
1686 }
1687
1688 static void printDesc(gni_post_descriptor_t *pd)
1689 {
1690     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1691 }
1692
1693 // for BIG_MSG called on receiver side for receiving control message
1694 // LMSG_INIT_TAG
1695 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1696 {
1697 #if     USE_LRTS_MEMPOOL
1698     CONTROL_MSG         *request_msg;
1699     gni_return_t        status = GNI_RC_SUCCESS;
1700     void                *msg_data;
1701     gni_post_descriptor_t *pd;
1702     gni_mem_handle_t    msg_mem_hndl;
1703     int source, size, transaction_size, offset = 0;
1704     int     register_size = 0;
1705
1706     // initial a get to transfer data from the sender side */
1707     request_msg = (CONTROL_MSG *) header;
1708     size = request_msg->total_length;
1709     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1710     if(request_msg->seq_id < 2)   {
1711         msg_data = CmiAlloc(size);
1712         CmiSetMsgSeq(msg_data, 0);
1713         _MEMCHECK(msg_data);
1714     }
1715     else {
1716         offset = ONE_SEG*(request_msg->seq_id-1);
1717         msg_data = (char*)request_msg->dest_addr + offset;
1718     }
1719    
1720     MallocPostDesc(pd);
1721     pd->cqwrite_value = request_msg->seq_id;
1722     if( request_msg->seq_id == 0)
1723     {
1724         pd->local_mem_hndl= GetMemHndl(msg_data);
1725         transaction_size = ALIGN64(size);
1726         if(IsMemHndlZero(pd->local_mem_hndl))
1727         {   
1728             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1729             if(status == GNI_RC_SUCCESS)
1730             {
1731                 pd->local_mem_hndl = GetMemHndl(msg_data);
1732             }
1733             else
1734             {
1735                 SetMemHndlZero(pd->local_mem_hndl);
1736             }
1737         }
1738         if(NoMsgInRecv( (void*)(msg_data)))
1739             register_size = GetMempoolsize((void*)(msg_data));
1740         else
1741             register_size = 0;
1742     }
1743     else{
1744         transaction_size = ALIGN64(request_msg->length);
1745         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1746         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1747         {
1748             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1749         }
1750     }
1751     pd->first_operand = ALIGN64(size);                   //  total length
1752
1753     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1754         pd->type            = GNI_POST_FMA_GET;
1755     else
1756         pd->type            = GNI_POST_RDMA_GET;
1757 #if REMOTE_EVENT
1758     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1759 #else
1760     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1761 #endif
1762     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1763     pd->length          = transaction_size;
1764     pd->local_addr      = (uint64_t) msg_data;
1765     pd->remote_addr     = request_msg->source_addr + offset;
1766     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1767     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1768     pd->rdma_mode       = 0;
1769     pd->amo_cmd         = 0;
1770
1771     //memory registration success
1772     if(status == GNI_RC_SUCCESS)
1773     {
1774         CMI_GNI_LOCK
1775         if(pd->type == GNI_POST_RDMA_GET) 
1776             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1777         else
1778             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1779         CMI_GNI_UNLOCK
1780
1781         if(status == GNI_RC_SUCCESS )
1782         {
1783             if(pd->cqwrite_value == 0)
1784             {
1785 #if MACHINE_DEBUG_LOG
1786                 buffered_recv_msg += register_size;
1787                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1788 #endif
1789                 IncreaseMsgInRecv(msg_data);
1790             }
1791         }
1792     }else
1793     {
1794         SetMemHndlZero((pd->local_mem_hndl));
1795     }
1796     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1797     {
1798         bufferRdmaMsg(inst_id, pd); 
1799     }else {
1800          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1801         GNI_RC_CHECK("GetLargeAFter posting", status);
1802     }
1803 #else
1804     CONTROL_MSG         *request_msg;
1805     gni_return_t        status;
1806     void                *msg_data;
1807     gni_post_descriptor_t *pd;
1808     RDMA_REQUEST        *rdma_request_msg;
1809     gni_mem_handle_t    msg_mem_hndl;
1810     //int source;
1811     // initial a get to transfer data from the sender side */
1812     request_msg = (CONTROL_MSG *) header;
1813     msg_data = CmiAlloc(request_msg->length);
1814     _MEMCHECK(msg_data);
1815
1816     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1817
1818     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1819     {
1820         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1821     }
1822
1823     MallocPostDesc(pd);
1824     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1825         pd->type            = GNI_POST_FMA_GET;
1826     else
1827         pd->type            = GNI_POST_RDMA_GET;
1828 #if REMOTE_EVENT
1829     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1830 #else
1831     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1832 #endif
1833     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1834     pd->length          = ALIGN64(request_msg->length);
1835     pd->local_addr      = (uint64_t) msg_data;
1836     pd->remote_addr     = request_msg->source_addr;
1837     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1838     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1839     pd->rdma_mode       = 0;
1840     pd->amo_cmd         = 0;
1841
1842     //memory registration successful
1843     if(status == GNI_RC_SUCCESS)
1844     {
1845         pd->local_mem_hndl  = msg_mem_hndl;
1846         CMI_GNI_LOCK
1847         if(pd->type == GNI_POST_RDMA_GET) 
1848             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1849         else
1850             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1851         CMI_GNI_UNLOCK
1852     }else
1853     {
1854         SetMemHndlZero(pd->local_mem_hndl);
1855     }
1856     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1857     {
1858         MallocRdmaRequest(rdma_request_msg);
1859         rdma_request_msg->next = 0;
1860         rdma_request_msg->destNode = inst_id;
1861         rdma_request_msg->pd = pd;
1862         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1863     }else {
1864         GNI_RC_CHECK("AFter posting", status);
1865     }
1866 #endif
1867 }
1868
1869 static void PumpLocalRdmaTransactions()
1870 {
1871     gni_cq_entry_t          ev;
1872     gni_return_t            status;
1873     uint64_t                type, inst_id;
1874     gni_post_descriptor_t   *tmp_pd;
1875     MSG_LIST                *ptr;
1876     CONTROL_MSG             *ack_msg_tmp;
1877     ACK_MSG                 *ack_msg;
1878     uint8_t                 msg_tag;
1879 #if CMK_DIRECT
1880     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1881 #endif
1882     SMSG_QUEUE         *queue = &smsg_queue;
1883
1884     while(1) {
1885         CMI_GNI_LOCK 
1886         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1887         CMI_GNI_UNLOCK
1888         if(status != GNI_RC_SUCCESS) break;
1889         
1890         type = GNI_CQ_GET_TYPE(ev);
1891         if (type == GNI_CQ_EVENT_TYPE_POST)
1892         {
1893             inst_id     = GNI_CQ_GET_INST_ID(ev);
1894 #if PRINT_SYH
1895             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1896 #endif
1897             CMI_GNI_LOCK
1898             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1899             CMI_GNI_UNLOCK
1900
1901             switch (tmp_pd->type) {
1902 #if CMK_PERSISTENT_COMM || CMK_DIRECT
1903             case GNI_POST_RDMA_PUT:
1904 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
1905                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1906 #endif
1907             case GNI_POST_FMA_PUT:
1908                 if(tmp_pd->amo_cmd == 1) {
1909                     //sender ACK to receiver to trigger it is done
1910                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1911                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
1912                     msg_tag = DIRECT_PUT_DONE_TAG;
1913                 }
1914                 else {
1915                     CmiFree((void *)tmp_pd->local_addr);
1916                     MallocControlMsg(ack_msg_tmp);
1917                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1918                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1919                     msg_tag = PUT_DONE_TAG;
1920                 }
1921                 break;
1922 #endif
1923             case GNI_POST_RDMA_GET:
1924             case GNI_POST_FMA_GET:  {
1925 #if  ! USE_LRTS_MEMPOOL
1926                 MallocControlMsg(ack_msg_tmp);
1927                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1928                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1929                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1930                 msg_tag = ACK_TAG;  
1931 #else
1932                 int seq_id = tmp_pd->cqwrite_value;
1933                 if(seq_id > 0)      // BIG_MSG
1934                 {
1935                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1936                     MallocControlMsg(ack_msg_tmp);
1937                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1938                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1939                     ack_msg_tmp->seq_id = seq_id;
1940                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1941                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1942                     ack_msg_tmp->length = tmp_pd->length;
1943                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1944                     msg_tag = BIG_MSG_TAG; 
1945                 } 
1946                 else
1947                 {
1948                     MallocAckMsg(ack_msg);
1949                     ack_msg->source_addr = tmp_pd->remote_addr;
1950                     msg_tag = ACK_TAG;  
1951                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
1952                 }
1953 #endif
1954                 break;
1955             }
1956             default:
1957                 CmiPrintf("type=%d\n", tmp_pd->type);
1958                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1959             }      /* end of switch */
1960
1961 #if CMK_DIRECT
1962             if (tmp_pd->amo_cmd == 1) {
1963                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1964                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
1965             }
1966             else
1967 #endif
1968             if (msg_tag == ACK_TAG) {
1969                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
1970                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
1971             }
1972             else {
1973                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
1974                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
1975             }
1976 #if CMK_PERSISTENT_COMM
1977             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1978 #endif
1979             {
1980                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1981 #if PRINT_SYH
1982                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1983 #endif
1984 #if CMK_SMP_TRACE_COMMTHREAD
1985                     START_EVENT();
1986 #endif
1987                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1988                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1989 #if MACHINE_DEBUG_LOG
1990                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1991                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1992                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1993 #endif
1994 #if CMK_SMP_TRACE_COMMTHREAD
1995                     TRACE_COMM_RECV(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1996 #endif
1997                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1998                 }else if(msg_tag == BIG_MSG_TAG){
1999                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2000                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2001                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2002 #if CMK_SMP_TRACE_COMMTHREAD
2003                         START_EVENT();
2004 #endif
2005 #if PRINT_SYH
2006                         printf("Pipeline msg done [%d]\n", myrank);
2007 #endif
2008 #if CMK_SMP_TRACE_COMMTHREAD
2009                         TRACE_COMM_RECV(CpvAccess(projTraceStart), msg);
2010 #endif
2011                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2012                     }
2013                 }
2014             }
2015             FreePostDesc(tmp_pd);
2016         }
2017     } //end while
2018     if(status == GNI_RC_ERROR_RESOURCE)
2019     {
2020         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2021         GNI_RC_CHECK("Smsg_tx_cq full", status);
2022     }
2023 }
2024
2025 static void  SendRdmaMsg()
2026 {
2027     gni_return_t            status = GNI_RC_SUCCESS;
2028     gni_mem_handle_t        msg_mem_hndl;
2029     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2030     RDMA_REQUEST            *pre = 0;
2031     int i, register_size = 0;
2032     void                    *msg;
2033 #if CMK_SMP
2034     int len = PCQueueLength(sendRdmaBuf);
2035     for (i=0; i<len; i++)
2036     {
2037         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2038         if (ptr == NULL) break;
2039 #else
2040     ptr = sendRdmaBuf;
2041     while (ptr!=0)
2042     {
2043 #endif 
2044         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2045         gni_post_descriptor_t *pd = ptr->pd;
2046         status = GNI_RC_SUCCESS;
2047         
2048         if(pd->cqwrite_value == 0)
2049         {
2050             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2051             {
2052                 msg = (void*)(pd->local_addr);
2053                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2054                 if(status == GNI_RC_SUCCESS)
2055                 {
2056                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2057                 }
2058             }else
2059             {
2060                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2061             }
2062             if(NoMsgInRecv( (void*)(pd->local_addr)))
2063                 register_size = GetMempoolsize((void*)(pd->local_addr));
2064             else
2065                 register_size = 0;
2066         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2067         {
2068             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2069         }
2070         if(status == GNI_RC_SUCCESS)        //mem register good
2071         {
2072             CMI_GNI_LOCK
2073             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2074                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2075             else
2076                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2077             CMI_GNI_UNLOCK
2078             if(status == GNI_RC_SUCCESS)    //post good
2079             {
2080 #if !CMK_SMP
2081                 tmp_ptr = ptr;
2082                 if(pre != 0) {
2083                     pre->next = ptr->next;
2084                 }
2085                 else {
2086                     sendRdmaBuf = ptr->next;
2087                 }
2088                 ptr = ptr->next;
2089                 FreeRdmaRequest(tmp_ptr);
2090 #endif
2091                 if(pd->cqwrite_value == 0)
2092                 {
2093                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2094                 }
2095 #if MACHINE_DEBUG_LOG
2096                 buffered_recv_msg += register_size;
2097                 MACHSTATE(8, "GO request from buffered\n"); 
2098 #endif
2099             }else           // cannot post
2100             {
2101 #if CMK_SMP
2102                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2103 #else
2104                 pre = ptr;
2105                 ptr = ptr->next;
2106 #endif
2107                 break;
2108             }
2109         } else          //memory registration fails
2110         {
2111 #if CMK_SMP
2112             PCQueuePush(sendRdmaBuf, (char*)ptr);
2113 #else
2114             pre = ptr;
2115             ptr = ptr->next;
2116 #endif
2117         }
2118     } //end while
2119 #if ! CMK_SMP
2120     if(ptr == 0)
2121         sendRdmaTail = pre;
2122 #endif
2123 }
2124
2125 // return 1 if all messages are sent
2126 static int SendBufferMsg(SMSG_QUEUE *queue)
2127 {
2128     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2129     CONTROL_MSG         *control_msg_tmp;
2130     gni_return_t        status;
2131     int                 done = 1;
2132     int                 register_size;
2133     void                *register_addr;
2134     int                 index_previous = -1;
2135 #if CMI_EXERT_SEND_CAP
2136     int                 sent_cnt = 0;
2137 #endif
2138
2139 #if CMK_SMP
2140     int          index = 0;
2141 #if ONE_SEND_QUEUE
2142     memset(destpe_avail, 0, mysize * sizeof(char));
2143     for (index=0; index<1; index++)
2144     {
2145         int i, len = PCQueueLength(queue->sendMsgBuf);
2146         for (i=0; i<len; i++) 
2147         {
2148             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2149             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2150                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2151                 continue;
2152             }
2153 #else
2154 #if SMP_LOCKS
2155     int nonempty = PCQueueLength(nonEmptyQueues);
2156     for(index =0; index<nonempty; index++)
2157     {
2158         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2159         PCQueue current_queue= current_list-> sendSmsgBuf;
2160         CmiLock(current_list->lock);
2161         int i, len = PCQueueLength(current_queue);
2162         current_list->pushed = 0;
2163         CmiUnlock(current_list->lock);
2164 #else
2165     for(index =0; index<mysize; index++)
2166     {
2167         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2168         int i, len = PCQueueLength(current_queue);
2169 #endif
2170         for (i=0; i<len; i++) 
2171         {
2172             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2173             if (ptr == 0) break;
2174 #endif
2175 #else
2176     int index = queue->smsg_head_index;
2177     while(index != -1)
2178     {
2179         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2180         pre = 0;
2181         while(ptr != 0)
2182         {
2183 #endif
2184             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2185             status = GNI_RC_ERROR_RESOURCE;
2186             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2187                 /* connection not exists yet */
2188             }
2189             else
2190             switch(ptr->tag)
2191             {
2192             case SMALL_DATA_TAG:
2193                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2194                 if(status == GNI_RC_SUCCESS)
2195                 {
2196                     CmiFree(ptr->msg);
2197                 }
2198                 break;
2199             case LMSG_INIT_TAG:
2200                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2201                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2202                 break;
2203             case ACK_TAG:
2204                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2205                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2206                 break;
2207             case BIG_MSG_TAG:
2208                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2209                 if(status == GNI_RC_SUCCESS)
2210                 {
2211                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2212                 }
2213                 break;
2214 #if CMK_DIRECT
2215             case DIRECT_PUT_DONE_TAG:
2216                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2217                 if(status == GNI_RC_SUCCESS)
2218                 {
2219                     free((CMK_DIRECT_HEADER*)ptr->msg);
2220                 }
2221                 break;
2222
2223 #endif
2224             default:
2225                 printf("Weird tag\n");
2226                 CmiAbort("should not happen\n");
2227             }       // end switch
2228             if(status == GNI_RC_SUCCESS)
2229             {
2230 #if !CMK_SMP
2231                 tmp_ptr = ptr;
2232                 if(pre)
2233                 {
2234                     ptr = pre ->next = ptr->next;
2235                 }else
2236                 {
2237                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2238                 }
2239                 FreeMsgList(tmp_ptr);
2240 #else
2241                 FreeMsgList(ptr);
2242 #endif
2243 #if PRINT_SYH
2244                 buffered_smsg_counter--;
2245                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2246 #endif
2247 #if CMI_EXERT_SEND_CAP
2248                 sent_cnt++;
2249                 if(sent_cnt == SEND_CAP)
2250                     break;
2251 #endif
2252             }else {
2253 #if CMK_SMP
2254 #if ONE_SEND_QUEUE
2255                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2256 #else
2257                 PCQueuePush(current_queue, (char*)ptr);
2258 #endif
2259 #else
2260                 pre = ptr;
2261                 ptr=ptr->next;
2262 #endif
2263                 done = 0;
2264                 if(status == GNI_RC_ERROR_RESOURCE)
2265                 {
2266 #if CMK_SMP && ONE_SEND_QUEUE 
2267                     destpe_avail[ptr->destNode] = 1;
2268 #else
2269                     break;
2270 #endif
2271                 }
2272             } 
2273         } //end while
2274 #if !CMK_SMP
2275         if(ptr == 0)
2276             queue->smsg_msglist_index[index].tail = pre;
2277         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2278         {
2279             if(index_previous != -1)
2280                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2281             else
2282                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2283         }
2284         else {
2285             index_previous = index;
2286         }
2287         index = queue->smsg_msglist_index[index].next;
2288 #else
2289 #if !ONE_SEND_QUEUE && SMP_LOCKS
2290         CmiLock(current_list->lock);
2291         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2292         {
2293             current_list->pushed = 1;
2294             PCQueuePush(nonEmptyQueues, current_list);
2295         }
2296         CmiUnlock(current_list->lock); 
2297 #endif
2298 #endif
2299
2300 #if CMI_EXERT_SEND_CAP
2301         if(sent_cnt == SEND_CAP)
2302                 break;
2303 #endif
2304     }   // end pooling for all cores
2305     return done;
2306 }
2307
2308 static void ProcessDeadlock();
2309 void LrtsAdvanceCommunication(int whileidle)
2310 {
2311     /*  Receive Msg first */
2312 #if CMK_SMP_TRACE_COMMTHREAD
2313     double startT, endT;
2314 #endif
2315     if (useDynamicSMSG && whileidle)
2316     {
2317 #if CMK_SMP_TRACE_COMMTHREAD
2318         startT = CmiWallTimer();
2319 #endif
2320         PumpDatagramConnection();
2321 #if CMK_SMP_TRACE_COMMTHREAD
2322         endT = CmiWallTimer();
2323         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2324 #endif
2325     }
2326
2327 #if CMK_SMP_TRACE_COMMTHREAD
2328     startT = CmiWallTimer();
2329 #endif
2330     PumpNetworkSmsg();
2331     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2332 #if CMK_SMP_TRACE_COMMTHREAD
2333     endT = CmiWallTimer();
2334     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2335 #endif
2336
2337 #if CMK_SMP_TRACE_COMMTHREAD
2338     startT = CmiWallTimer();
2339 #endif
2340     PumpLocalRdmaTransactions();
2341     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2342 #if CMK_SMP_TRACE_COMMTHREAD
2343     endT = CmiWallTimer();
2344     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2345 #endif
2346     /* Send buffered Message */
2347 #if CMK_SMP_TRACE_COMMTHREAD
2348     startT = CmiWallTimer();
2349 #endif
2350 #if CMK_USE_OOB
2351     if (SendBufferMsg(&smsg_oob_queue) == 1)
2352 #endif
2353     SendBufferMsg(&smsg_queue);
2354     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2355 #if CMK_SMP_TRACE_COMMTHREAD
2356     endT = CmiWallTimer();
2357     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2358 #endif
2359
2360 #if CMK_SMP_TRACE_COMMTHREAD
2361     startT = CmiWallTimer();
2362 #endif
2363     SendRdmaMsg();
2364     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2365 #if CMK_SMP_TRACE_COMMTHREAD
2366     endT = CmiWallTimer();
2367     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2368 #endif
2369
2370 #if CMK_SMP
2371     if (_detected_hang)  ProcessDeadlock();
2372 #endif
2373 }
2374
2375 /* useDynamicSMSG */
2376 static void _init_dynamic_smsg()
2377 {
2378     gni_return_t status;
2379     uint32_t     vmdh_index = -1;
2380     int i;
2381
2382     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2383     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2384     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2385     for(i=0; i<mysize; i++) {
2386         smsg_connected_flag[i] = 0;
2387         smsg_attr_vector_local[i] = NULL;
2388         smsg_attr_vector_remote[i] = NULL;
2389     }
2390     if(mysize <=512)
2391     {
2392         SMSG_MAX_MSG = 4096;
2393     }else if (mysize <= 4096)
2394     {
2395         SMSG_MAX_MSG = 4096/mysize * 1024;
2396     }else if (mysize <= 16384)
2397     {
2398         SMSG_MAX_MSG = 512;
2399     }else {
2400         SMSG_MAX_MSG = 256;
2401     }
2402
2403     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2404     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2405     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2406     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2407     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2408
2409     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2410     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2411     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2412     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2413     mailbox_list->offset = 0;
2414     mailbox_list->next = 0;
2415     
2416     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2417         mailbox_list->size, smsg_rx_cqh,
2418         GNI_MEM_READWRITE,   
2419         vmdh_index,
2420         &(mailbox_list->mem_hndl));
2421     GNI_RC_CHECK("MEMORY registration for smsg", status);
2422
2423     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2424     GNI_RC_CHECK("Unbound EP", status);
2425     
2426     alloc_smsg_attr(&send_smsg_attr);
2427
2428     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2429     GNI_RC_CHECK("post unbound datagram", status);
2430
2431       /* always pre-connect to proc 0 */
2432     //if (myrank != 0) connect_to(0);
2433 }
2434
2435 static void _init_static_smsg()
2436 {
2437     gni_smsg_attr_t      *smsg_attr;
2438     gni_smsg_attr_t      remote_smsg_attr;
2439     gni_smsg_attr_t      *smsg_attr_vec;
2440     gni_mem_handle_t     my_smsg_mdh_mailbox;
2441     int      ret, i;
2442     gni_return_t status;
2443     uint32_t              vmdh_index = -1;
2444     mdh_addr_t            base_infor;
2445     mdh_addr_t            *base_addr_vec;
2446     char *env;
2447
2448     if(mysize <=512)
2449     {
2450         SMSG_MAX_MSG = 1024;
2451     }else if (mysize <= 4096)
2452     {
2453         SMSG_MAX_MSG = 1024;
2454     }else if (mysize <= 16384)
2455     {
2456         SMSG_MAX_MSG = 512;
2457     }else {
2458         SMSG_MAX_MSG = 256;
2459     }
2460     
2461     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2462     if (env) SMSG_MAX_MSG = atoi(env);
2463     CmiAssert(SMSG_MAX_MSG > 0);
2464
2465     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2466     
2467     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2468     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2469     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2470     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2471     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2472     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2473     CmiAssert(ret == 0);
2474     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2475     
2476     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2477             smsg_memlen*(mysize), smsg_rx_cqh,
2478             GNI_MEM_READWRITE,   
2479             vmdh_index,
2480             &my_smsg_mdh_mailbox);
2481     register_memory_size += smsg_memlen*(mysize);
2482     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2483
2484     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2485     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2486
2487     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2488     base_infor.mdh =  my_smsg_mdh_mailbox;
2489     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2490
2491     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2492  
2493     for(i=0; i<mysize; i++)
2494     {
2495         if(i==myrank)
2496             continue;
2497         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2498         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2499         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2500         smsg_attr[i].mbox_offset = i*smsg_memlen;
2501         smsg_attr[i].buff_size = smsg_memlen;
2502         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2503         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2504     }
2505
2506     for(i=0; i<mysize; i++)
2507     {
2508         if (myrank == i) continue;
2509
2510         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2511         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2512         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2513         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2514         remote_smsg_attr.buff_size = smsg_memlen;
2515         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2516         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2517
2518         /* initialize the smsg channel */
2519         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2520         GNI_RC_CHECK("SMSG Init", status);
2521     } //end initialization
2522
2523     free(base_addr_vec);
2524     free(smsg_attr);
2525
2526     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2527     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2528
2529
2530 inline
2531 static void _init_send_queue(SMSG_QUEUE *queue)
2532 {
2533      int i;
2534 #if ONE_SEND_QUEUE
2535      queue->sendMsgBuf = PCQueueCreate();
2536      destpe_avail = (char*)malloc(mysize * sizeof(char));
2537 #else
2538      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2539 #if CMK_SMP && SMP_LOCKS
2540      nonEmptyQueues = PCQueueCreate();
2541 #endif
2542      for(i =0; i<mysize; i++)
2543      {
2544 #if CMK_SMP
2545          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2546 #if SMP_LOCKS
2547          queue->smsg_msglist_index[i].pushed = 0;
2548          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2549 #endif
2550 #else
2551          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2552          queue->smsg_msglist_index[i].next = -1;
2553          queue->smsg_head_index = -1;
2554 #endif
2555         
2556      }
2557 #endif
2558 }
2559
2560 inline
2561 static void _init_smsg()
2562 {
2563     if(mysize > 1) {
2564         if (useDynamicSMSG)
2565             _init_dynamic_smsg();
2566         else
2567             _init_static_smsg();
2568     }
2569
2570     _init_send_queue(&smsg_queue);
2571 #if CMK_USE_OOB
2572     _init_send_queue(&smsg_oob_queue);
2573 #endif
2574 }
2575
2576 static void _init_static_msgq()
2577 {
2578     gni_return_t status;
2579     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2580     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2581     msgq_attrs.smsg_q_sz = 1;
2582     msgq_attrs.rcv_pool_sz = 1;
2583     msgq_attrs.num_msgq_eps = 2;
2584     msgq_attrs.nloc_insts = 8;
2585     msgq_attrs.modes = 0;
2586     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2587
2588     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2589     GNI_RC_CHECK("MSGQ Init", status);
2590
2591
2592 }
2593
2594 #if CMK_SMP && STEAL_MEMPOOL
2595 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2596 {
2597     void *pool = NULL;
2598     int i, k;
2599     // check other ranks
2600     for (k=0; k<CmiMyNodeSize()+1; k++) {
2601         i = (CmiMyRank()+k)%CmiMyNodeSize();
2602         if (i==CmiMyRank()) continue;
2603         mempool_type *mptr = CpvAccessOther(mempool, i);
2604         CmiLock(mptr->mempoolLock);
2605         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2606         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2607             CmiUnlock(mptr->mempoolLock);
2608             continue;
2609         }
2610         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2611         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2612             /* search in the free list */
2613           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2614           mempool_header *current = free_header;
2615           while (current) {
2616             if (current->next_free == (char*)header-(char*)mptr) break;
2617             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2618           }
2619           if (current == NULL) {         /* not found in free list */
2620             CmiUnlock(mptr->mempoolLock);
2621             continue;
2622           }
2623 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2624             /* search the previous memblock, and remove the tail */
2625           mempool_block *ptr = (mempool_block *)mptr;
2626           while (ptr) {
2627             if (ptr->memblock_next == mptr->memblock_tail) break;
2628             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2629           }
2630           CmiAssert(ptr!=NULL);
2631           ptr->memblock_next = 0;
2632           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2633
2634             /* remove memblock from the free list */
2635           current->next_free = header->next_free;
2636           if (header == free_header) mptr->freelist_head = header->next_free;
2637
2638           CmiUnlock(mptr->mempoolLock);
2639
2640           pool = (void*)tail;
2641           *mem_hndl = tail->mem_hndl;
2642           *size = tail->size;
2643           return pool;
2644         }
2645         CmiUnlock(mptr->mempoolLock);
2646     }
2647
2648       /* steal failed, deregister and free memblock now */
2649     int freed = 0;
2650     for (k=0; k<CmiMyNodeSize()+1; k++) {
2651         i = (CmiMyRank()+k)%CmiMyNodeSize();
2652         mempool_type *mptr = CpvAccessOther(mempool, i);
2653         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2654
2655         mempool_block *mempools_head = &(mptr->mempools_head);
2656         mempool_block *current = mempools_head;
2657         mempool_block *prev = NULL;
2658
2659         while (current) {
2660           int isfree = 0;
2661           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2662 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2663           mempool_header *cur = free_header;
2664           mempool_header *header;
2665           if (current != mempools_head) {
2666             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2667              /* search in free list */
2668             if (header->size == current->size - sizeof(mempool_block)) {
2669               cur = free_header;
2670               while (cur) {
2671                 if (cur->next_free == (char*)header-(char*)mptr) break;
2672                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2673               }
2674               if (cur != NULL) isfree = 1;
2675             }
2676           }
2677           if (isfree) {
2678               /* remove from free list */
2679             cur->next_free = header->next_free;
2680             if (header == free_header) mptr->freelist_head = header->next_free;
2681              // deregister
2682             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2683             GNI_RC_CHECK("steal Mempool de-register", status);
2684             mempool_block *ptr = current;
2685             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2686             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2687 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2688             freed += ptr->size;
2689             free(ptr);
2690              // try now
2691             if (freed > *size) {
2692               if (pool == NULL) {
2693                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2694                 CmiAssert(ret == 0);
2695               }
2696               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2697               if (status == GNI_RC_SUCCESS) {
2698                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2699 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2700                 return pool;
2701               }
2702 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2703             }
2704           }
2705           else {
2706              prev = current;
2707              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2708           }
2709         }
2710
2711         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2712     }
2713       /* still no luck registering pool */
2714     if (pool) free(pool);
2715     return NULL;
2716 }
2717 #endif
2718
2719 static long long int total_mempool_size = 0;
2720 static long long int total_mempool_calls = 0;
2721
2722 #if USE_LRTS_MEMPOOL
2723 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2724 {
2725     void *pool;
2726     int ret;
2727     gni_return_t status = GNI_RC_SUCCESS;
2728
2729     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2730     if (*size < default_size) *size = default_size;
2731     total_mempool_size += *size;
2732     total_mempool_calls += 1;
2733 #if   !LARGEPAGE
2734     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
2735     {
2736         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
2737         CmiAbort("alloc_mempool_block");
2738     }
2739 #endif
2740     ret = posix_memalign(&pool, ALIGNBUF, *size);
2741     if (ret != 0) {
2742 #if CMK_SMP && STEAL_MEMPOOL
2743       pool = steal_mempool_block(size, mem_hndl);
2744       if (pool != NULL) return pool;
2745 #endif
2746       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2747       if (ret == ENOMEM)
2748         CmiAbort("alloc_mempool_block: out of memory.");
2749       else
2750         CmiAbort("alloc_mempool_block: posix_memalign failed");
2751     }
2752 #if LARGEPAGE
2753     CmiMemLock();
2754     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
2755     CmiMemUnlock();
2756     if(status != GNI_RC_SUCCESS)
2757         printf("[%d, %d] memory reigstration fails %lld ask for %lld\n", myrank, CmiMyRank(), register_memory_size, *size);
2758     GNI_RC_CHECK("MEMORY_REGISTER", status);
2759 #else
2760     SetMemHndlZero((*mem_hndl));
2761 #endif
2762     return pool;
2763 }
2764
2765 // ptr is a block head pointer
2766 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2767 {
2768     if(!(IsMemHndlZero(mem_hndl)))
2769     {
2770         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
2771     }
2772     free(ptr);
2773 }
2774 #endif
2775
2776 void LrtsPreCommonInit(int everReturn){
2777 #if USE_LRTS_MEMPOOL
2778     CpvInitialize(mempool_type*, mempool);
2779     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
2780     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2781 #endif
2782 }
2783
2784 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2785 {
2786     register int            i;
2787     int                     rc;
2788     int                     device_id = 0;
2789     unsigned int            remote_addr;
2790     gni_cdm_handle_t        cdm_hndl;
2791     gni_return_t            status = GNI_RC_SUCCESS;
2792     uint32_t                vmdh_index = -1;
2793     uint8_t                 ptag;
2794     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2795     int                     first_spawned;
2796     int                     physicalID;
2797     char                   *env;
2798
2799     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2800     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2801     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2802    
2803     status = PMI_Init(&first_spawned);
2804     GNI_RC_CHECK("PMI_Init", status);
2805
2806     status = PMI_Get_size(&mysize);
2807     GNI_RC_CHECK("PMI_Getsize", status);
2808
2809     status = PMI_Get_rank(&myrank);
2810     GNI_RC_CHECK("PMI_getrank", status);
2811
2812     //physicalID = CmiPhysicalNodeID(myrank);
2813     
2814     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2815
2816     *myNodeID = myrank;
2817     *numNodes = mysize;
2818   
2819 #if MULTI_THREAD_SEND
2820     /* Currently, we only consider the case that comm. thread will only recv msgs */
2821     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2822 #endif
2823
2824     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2825     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2826     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2827
2828     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2829     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2830     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2831
2832     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2833     if (env) useDynamicSMSG = 1;
2834     if (!useDynamicSMSG)
2835       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2836     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2837     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2838     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2839     
2840     if(myrank == 0)
2841     {
2842         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2843         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2844     }
2845 #ifdef USE_ONESIDED
2846     onesided_init(NULL, &onesided_hnd);
2847
2848     // this is a GNI test, so use the libonesided bypass functionality
2849     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2850     local_addr = gniGetNicAddress();
2851 #else
2852     ptag = get_ptag();
2853     cookie = get_cookie();
2854 #if 0
2855     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2856 #endif
2857     //Create and attach to the communication  domain */
2858     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2859     GNI_RC_CHECK("GNI_CdmCreate", status);
2860     //* device id The device id is the minor number for the device
2861     //that is assigned to the device by the system when the device is created.
2862     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2863     //where X is the device number 0 default 
2864     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2865     GNI_RC_CHECK("GNI_CdmAttach", status);
2866     local_addr = get_gni_nic_address(0);
2867 #endif
2868     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2869     _MEMCHECK(MPID_UGNI_AllAddr);
2870     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2871     /* create the local completion queue */
2872     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2873     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2874     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2875     
2876     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2877     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2878     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2879
2880     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2881     GNI_RC_CHECK("Create CQ (rx)", status);
2882     
2883     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2884     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2885     
2886     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2887     //GNI_RC_CHECK("Create BTE CQ", status);
2888
2889     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2890     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2891     _MEMCHECK(ep_hndl_array);
2892 #if CMK_SMP && !COMM_THREAD_SEND
2893     tx_cq_lock  = CmiCreateLock();
2894     rx_cq_lock  = CmiCreateLock();
2895 #endif
2896     for (i=0; i<mysize; i++) {
2897         if(i == myrank) continue;
2898         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2899         GNI_RC_CHECK("GNI_EpCreate ", status);   
2900         remote_addr = MPID_UGNI_AllAddr[i];
2901         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2902         GNI_RC_CHECK("GNI_EpBind ", status);   
2903     }
2904
2905     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2906     _init_smsg();
2907     PMI_Barrier();
2908
2909 #if     USE_LRTS_MEMPOOL
2910     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
2911     if (env) {
2912         _totalmem = CmiReadSize(env);
2913         if (myrank == 0)
2914             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
2915     }
2916
2917     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2918     if (env) _mempool_size = CmiReadSize(env);
2919     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2920         _mempool_size = CmiReadSize(env);
2921
2922
2923     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2924     if (env) {
2925         MAX_REG_MEM = CmiReadSize(env);
2926         user_set_flag = 1;
2927     }
2928     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
2929         MAX_REG_MEM = CmiReadSize(env);
2930         user_set_flag = 1;
2931     }
2932
2933     env = getenv("CHARM_UGNI_SEND_MAX");
2934     if (env) {
2935         MAX_BUFF_SEND = CmiReadSize(env);
2936         user_set_flag = 1;
2937     }
2938     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
2939         MAX_BUFF_SEND = CmiReadSize(env);
2940         user_set_flag = 1;
2941     }
2942
2943     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
2944     if (env) {
2945         _mempool_size_limit = CmiReadSize(env);
2946     }
2947
2948     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2949     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2950
2951     if (myrank==0) {
2952         printf("Charm++> memory pool init size: %1.fMB, max size: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
2953         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2954         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2955             /* memblock can expand to BIG_MSG * 2 size */
2956             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2957             CmiAbort("mempool maximum size is too small. \n");
2958         }
2959 #if CMK_SMP && MULTI_THREAD_SEND
2960         printf("Charm++> worker thread sending messages\n");
2961 #elif CMK_SMP && COMM_THREAD_SEND
2962         printf("Charm++> only comm thread send/recv messages\n");
2963 #endif
2964     }
2965
2966 #endif     /* end of USE_LRTS_MEMPOOL */
2967
2968     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2969     if (env) {
2970         BIG_MSG = CmiReadSize(env);
2971         if (BIG_MSG < ONE_SEG)
2972           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2973     }
2974     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2975     if (env) {
2976         BIG_MSG_PIPELINE = atoi(env);
2977     }
2978
2979     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2980     if (env) _checkProgress = 0;
2981     if (mysize == 1) _checkProgress = 0;
2982
2983     
2984     /*
2985     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
2986     if (env) 
2987         _tlbpagesize = CmiReadSize(env);
2988     */
2989     /* real gethugepagesize() is only available when hugetlb module linked */
2990     _tlbpagesize = gethugepagesize();
2991     if (myrank == 0) {
2992         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
2993     }
2994
2995     /* init DMA buffer for medium message */
2996
2997     //_init_DMA_buffer();
2998     
2999     free(MPID_UGNI_AllAddr);
3000 #if CMK_SMP
3001     sendRdmaBuf = PCQueueCreate();
3002 #else
3003     sendRdmaBuf = 0;
3004 #endif
3005 #if MACHINE_DEBUG_LOG
3006     char ln[200];
3007     sprintf(ln,"debugLog.%d",myrank);
3008     debugLog=fopen(ln,"w");
3009 #endif
3010
3011 }
3012
3013 void* LrtsAlloc(int n_bytes, int header)
3014 {
3015     void *ptr;
3016 #if 0
3017     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3018 #endif
3019     if(n_bytes <= SMSG_MAX_MSG)
3020     {
3021         int totalsize = n_bytes+header;
3022         ptr = malloc(totalsize);
3023     }
3024     else {
3025         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3026 #if     USE_LRTS_MEMPOOL
3027         n_bytes = ALIGN64(n_bytes);
3028         if(n_bytes < BIG_MSG)
3029         {
3030             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3031             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3032         }else 
3033         {
3034             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3035             ptr = res + ALIGNBUF - header;
3036
3037         }
3038 #else
3039         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3040         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3041         ptr = res + ALIGNBUF - header;
3042 #endif
3043     }
3044     return ptr;
3045 }
3046
3047 void  LrtsFree(void *msg)
3048 {
3049     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3050     if (size <= SMSG_MAX_MSG)
3051       free(msg);
3052     else if(size>=BIG_MSG)
3053     {
3054         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3055
3056     }else
3057     {
3058 #if     USE_LRTS_MEMPOOL
3059 #if CMK_SMP
3060         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3061 #else
3062         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3063 #endif
3064 #else
3065         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3066 #endif
3067     }
3068 }
3069
3070 void LrtsExit()
3071 {
3072     /* free memory ? */
3073 #if USE_LRTS_MEMPOOL
3074     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3075     mempool_destroy(CpvAccess(mempool));
3076 #endif
3077     PMI_Finalize();
3078     exit(0);
3079 }
3080
3081 void LrtsDrainResources()
3082 {
3083     if(mysize == 1) return;
3084     while (
3085 #if CMK_USE_OOB
3086            !SendBufferMsg(&smsg_oob_queue) ||
3087 #endif
3088            !SendBufferMsg(&smsg_queue))
3089     {
3090         if (useDynamicSMSG)
3091             PumpDatagramConnection();
3092         PumpNetworkSmsg();
3093         PumpLocalRdmaTransactions();
3094         SendRdmaMsg();
3095     }
3096     PMI_Barrier();
3097 }
3098
3099 void LrtsAbort(const char *message) {
3100     printf("CmiAbort is calling on PE:%d\n", myrank);
3101     CmiPrintStackTrace(0);
3102     PMI_Abort(-1, message);
3103 }
3104
3105 /**************************  TIMER FUNCTIONS **************************/
3106 #if CMK_TIMER_USE_SPECIAL
3107 /* MPI calls are not threadsafe, even the timer on some machines */
3108 static CmiNodeLock  timerLock = 0;
3109 static int _absoluteTime = 0;
3110 static int _is_global = 0;
3111 static struct timespec start_ts;
3112
3113 inline int CmiTimerIsSynchronized() {
3114     return 0;
3115 }
3116
3117 inline int CmiTimerAbsolute() {
3118     return _absoluteTime;
3119 }
3120
3121 double CmiStartTimer() {
3122     return 0.0;
3123 }
3124
3125 double CmiInitTime() {
3126     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3127 }
3128
3129 void CmiTimerInit(char **argv) {
3130     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3131     if (_absoluteTime && CmiMyPe() == 0)
3132         printf("Charm++> absolute  timer is used\n");
3133     
3134     _is_global = CmiTimerIsSynchronized();
3135
3136
3137     if (_is_global) {
3138         if (CmiMyRank() == 0) {
3139             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3140         }
3141     } else { /* we don't have a synchronous timer, set our own start time */
3142         CmiBarrier();
3143         CmiBarrier();
3144         CmiBarrier();
3145         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3146     }
3147     CmiNodeAllBarrier();          /* for smp */
3148 }
3149
3150 /**
3151  * Since the timerLock is never created, and is
3152  * always NULL, then all the if-condition inside
3153  * the timer functions could be disabled right
3154  * now in the case of SMP.
3155  */
3156 double CmiTimer(void) {
3157     struct timespec now_ts;
3158     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3159     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3160         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3161 }
3162
3163 double CmiWallTimer(void) {
3164     struct timespec now_ts;
3165     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3166     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3167         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3168 }
3169
3170 double CmiCpuTimer(void) {
3171     struct timespec now_ts;
3172     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3173     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3174         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3175 }
3176
3177 #endif
3178 /************Barrier Related Functions****************/
3179
3180 int CmiBarrier()
3181 {
3182     gni_return_t status;
3183
3184 #if CMK_SMP
3185     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3186     CmiNodeAllBarrier();
3187     if (CmiMyRank() == CmiMyNodeSize())
3188 #else
3189     if (CmiMyRank() == 0)
3190 #endif
3191     {
3192         /**
3193          *  The call of CmiBarrier is usually before the initialization
3194          *  of trace module of Charm++, therefore, the START_EVENT
3195          *  and END_EVENT are disabled here. -Chao Mei
3196          */
3197         /*START_EVENT();*/
3198         status = PMI_Barrier();
3199         GNI_RC_CHECK("PMI_Barrier", status);
3200         /*END_EVENT(10);*/
3201     }
3202     CmiNodeAllBarrier();
3203     return status;
3204
3205 }
3206 #if CMK_DIRECT
3207 #include "machine-cmidirect.c"
3208 #endif
3209 #if CMK_PERSISTENT_COMM
3210 #include "machine-persistent.c"
3211 #endif
3212
3213