aa7ef268fb42df01bd4c5e091cf7fc3ada304d44
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              0
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     1
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP   0
65 #define CMI_EXERT_RECV_RDMA_CAP    0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP     0
69 #define CMI_PUMPNETWORKSMSG_CAP    0
70
71 #if CMI_SENDBUFFERSMSG_CAP
72 int     SendBufferMsg_cap  = 10;
73 #endif
74
75 #if CMI_PUMPNETWORKSMSG_CAP
76 int     PumpNetworkSmsg_cap = 10;
77 #endif
78
79 #if CMI_EXERT_SEND_LARGE_CAP
80 static int SEND_large_cap = 100;
81 static int SEND_large_pending = 0;
82 #endif
83
84 #if CMI_EXERT_RECV_RDMA_CAP
85 static int   RDMA_cap =   100;
86 static int   RDMA_pending = 0;
87 #endif
88
89 #define USE_LRTS_MEMPOOL                  1
90
91 #define PRINT_SYH                         0
92
93 // Trace communication thread
94 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
95 #define TRACE_THRESHOLD     0.00001
96 #define CMI_MPI_TRACE_MOREDETAILED 0
97 #undef CMI_MPI_TRACE_USEREVENTS
98 #define CMI_MPI_TRACE_USEREVENTS 1
99 #else
100 #undef CMK_SMP_TRACE_COMMTHREAD
101 #define CMK_SMP_TRACE_COMMTHREAD 0
102 #endif
103
104 #define CMK_TRACE_COMMOVERHEAD 0
105 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
106 #undef CMI_MPI_TRACE_USEREVENTS
107 #define CMI_MPI_TRACE_USEREVENTS 1
108 #else
109 #undef CMK_TRACE_COMMOVERHEAD
110 #define CMK_TRACE_COMMOVERHEAD 0
111 #endif
112
113 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
114 CpvStaticDeclare(double, projTraceStart);
115 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
116 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
117 #define  EVENT_TIME()   CpvAccess(projTraceStart)
118 #else
119 #define  START_EVENT()
120 #define  END_EVENT(x)
121 #define  EVENT_TIME()   (0.0)
122 #endif
123
124 #if USE_LRTS_MEMPOOL
125
126 #define oneMB (1024ll*1024)
127 #define oneGB (1024ll*1024*1024)
128
129 static CmiInt8 _mempool_size = 8*oneMB;
130 static CmiInt8 _expand_mem =  4*oneMB;
131 static CmiInt8 _mempool_size_limit = 0;
132
133 static CmiInt8 _totalmem = 0.8*oneGB;
134
135 #if LARGEPAGE
136 static CmiInt8 BIG_MSG  =  16*oneMB;
137 static CmiInt8 ONE_SEG  =  4*oneMB;
138 #else
139 static CmiInt8 BIG_MSG  =  4*oneMB;
140 static CmiInt8 ONE_SEG  =  2*oneMB;
141 #endif
142 #if MULTI_THREAD_SEND
143 static int BIG_MSG_PIPELINE = 1;
144 #else
145 static int BIG_MSG_PIPELINE = 4;
146 #endif
147
148 // dynamic flow control
149 static CmiInt8 buffered_send_msg = 0;
150 static CmiInt8 register_memory_size = 0;
151
152 #if LARGEPAGE
153 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
154 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
155 static CmiInt8  register_count = 0;
156 #else
157 #if CMK_SMP && COMM_THREAD_SEND 
158 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
159 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
160 #else
161 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
162 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
163 #endif
164
165
166 #endif
167
168 #endif     /* end USE_LRTS_MEMPOOL */
169
170 #if MULTI_THREAD_SEND 
171 #define     CMI_GNI_LOCK(x)       CmiLock(x);
172 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
173 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
174 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
175 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
176 #else
177 #define     CMI_GNI_LOCK(x)
178 #define     CMI_GNI_TRYLOCK(x)         (0)
179 #define     CMI_GNI_UNLOCK(x)
180 #define     CMI_PCQUEUEPOP_LOCK(Q)   
181 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
182 #endif
183
184 static int _tlbpagesize = 4096;
185
186 //static int _smpd_count  = 0;
187
188 static int   user_set_flag  = 0;
189
190 static int _checkProgress = 1;             /* check deadlock */
191 static int _detected_hang = 0;
192
193 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
194
195 // dynamic SMSG
196 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
197
198 static int avg_smsg_connection = 32;
199 static int                 *smsg_connected_flag= 0;
200 static gni_smsg_attr_t     **smsg_attr_vector_local;
201 static gni_smsg_attr_t     **smsg_attr_vector_remote;
202 static gni_ep_handle_t     ep_hndl_unbound;
203 static gni_smsg_attr_t     send_smsg_attr;
204 static gni_smsg_attr_t     recv_smsg_attr;
205
206 typedef struct _dynamic_smsg_mailbox{
207    void     *mailbox_base;
208    int      size;
209    int      offset;
210    gni_mem_handle_t  mem_hndl;
211    struct      _dynamic_smsg_mailbox  *next;
212 }dynamic_smsg_mailbox_t;
213
214 static dynamic_smsg_mailbox_t  *mailbox_list;
215
216 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
217 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
218
219 #if PRINT_SYH
220 int         lrts_send_msg_id = 0;
221 int         lrts_local_done_msg = 0;
222 int         lrts_send_rdma_success = 0;
223 #endif
224
225 #include "machine.h"
226
227 #include "pcqueue.h"
228
229 #include "mempool.h"
230
231 #if CMK_PERSISTENT_COMM
232 #include "machine-persistent.h"
233 #define  SEND_PERSISTENT    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendPersistentBuf));
234 #define  RECV_PERSISTENT    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(persistent_rx_cqh) );
235 #else  
236 #define  SEND_PERSISTENT   
237 #define  RECV_PERSISTENT
238 #endif
239
240 //#define  USE_ONESIDED 1
241 #ifdef USE_ONESIDED
242 //onesided implementation is wrong, since no place to restore omdh
243 #include "onesided.h"
244 onesided_hnd_t   onesided_hnd;
245 onesided_md_t    omdh;
246 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
247
248 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
249
250 #else
251 uint8_t   onesided_hnd, omdh;
252
253 #if REMOTE_EVENT || CQWRITE 
254 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
255     if(register_memory_size+size>= MAX_REG_MEM) { \
256         status = GNI_RC_ERROR_NOMEM;} \
257     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
258         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
259 #else
260 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
261         if (register_memory_size + size >= MAX_REG_MEM) { \
262             status = GNI_RC_ERROR_NOMEM; \
263         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
264             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
265 #endif
266
267 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
268     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
269              register_memory_size -= size; \
270          else CmiAbort("MEM_DEregister");  \
271     } while (0)
272 #endif
273
274 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
275 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
276 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
277 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
278 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
279 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
280 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
281 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
282 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
283 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
284 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
285 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
286 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
287 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
288
289 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
290 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
291
292 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
293 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
294 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
295 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
296
297 #define ALIGNBUF                64
298
299 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
300 /* If SMSG is not used */
301
302 #define FMA_PER_CORE  1024
303 #define FMA_BUFFER_SIZE 1024
304
305 /* If SMSG is used */
306 static int  SMSG_MAX_MSG = 1024;
307 #define SMSG_MAX_CREDIT    72
308
309 #define MSGQ_MAXSIZE       2048
310
311 /* large message transfer with FMA or BTE */
312 #if ! REMOTE_EVENT
313 #define LRTS_GNI_RDMA_THRESHOLD  1024 
314 #else
315    /* remote events only work with RDMA */
316 #define LRTS_GNI_RDMA_THRESHOLD  0 
317 #endif
318
319 #if CMK_SMP
320 static int  REMOTE_QUEUE_ENTRIES=163840; 
321 static int LOCAL_QUEUE_ENTRIES=163840; 
322 #else
323 static int  REMOTE_QUEUE_ENTRIES=20480;
324 static int LOCAL_QUEUE_ENTRIES=20480; 
325 #endif
326
327 #define BIG_MSG_TAG             0x26
328 #define PUT_DONE_TAG            0x28
329 #define DIRECT_PUT_DONE_TAG     0x29
330 #define ACK_TAG                 0x30
331 /* SMSG is data message */
332 #define SMALL_DATA_TAG          0x31
333 /* SMSG is a control message to initialize a BTE */
334 #define LMSG_INIT_TAG           0x39 
335
336 #define DEBUG
337 #ifdef GNI_RC_CHECK
338 #undef GNI_RC_CHECK
339 #endif
340 #ifdef DEBUG
341 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
342 #else
343 #define GNI_RC_CHECK(msg,rc)
344 #endif
345
346 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
347 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
348 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
349
350 static int useStaticMSGQ = 0;
351 static int useStaticFMA = 0;
352 static int mysize, myrank;
353 static gni_nic_handle_t   nic_hndl;
354
355 typedef struct {
356     gni_mem_handle_t mdh;
357     uint64_t addr;
358 } mdh_addr_t ;
359 // this is related to dynamic SMSG
360
361 typedef struct mdh_addr_list{
362     gni_mem_handle_t mdh;
363    void *addr;
364     struct mdh_addr_list *next;
365 }mdh_addr_list_t;
366
367 static unsigned int         smsg_memlen;
368 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
369 mdh_addr_t          setup_mem;
370 mdh_addr_t          *smsg_connection_vec = 0;
371 gni_mem_handle_t    smsg_connection_memhndl;
372 static int          smsg_expand_slots = 10;
373 static int          smsg_available_slot = 0;
374 static void         *smsg_mailbox_mempool = 0;
375 mdh_addr_list_t     *smsg_dynamic_list = 0;
376
377 static void             *smsg_mailbox_base;
378 gni_msgq_attr_t         msgq_attrs;
379 gni_msgq_handle_t       msgq_handle;
380 gni_msgq_ep_attr_t      msgq_ep_attrs;
381 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
382
383 /* =====Beginning of Declarations of Machine Specific Variables===== */
384 static int cookie;
385 static int modes = 0;
386 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
387 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
388 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
389 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
390 static gni_cq_handle_t       persistent_rx_cqh = NULL;      // mempool - remote event
391 static gni_ep_handle_t       *ep_hndl_array;
392
393 static CmiNodeLock           *ep_lock_array;
394 static CmiNodeLock           default_tx_cq_lock; 
395 static CmiNodeLock           rdma_tx_cq_lock; 
396 static CmiNodeLock           global_gni_lock; 
397 static CmiNodeLock           rx_cq_lock;
398 static CmiNodeLock           smsg_mailbox_lock;
399 static CmiNodeLock           smsg_rx_cq_lock;
400 static CmiNodeLock           *mempool_lock;
401 //#define     CMK_WITH_STATS      1
402 typedef struct msg_list
403 {
404     uint32_t destNode;
405     uint32_t size;
406     void *msg;
407     uint8_t tag;
408 #if !CMK_SMP
409     struct msg_list *next;
410 #endif
411 #if CMK_WITH_STATS
412     double  creation_time;
413 #endif
414 }MSG_LIST;
415
416
417 typedef struct control_msg
418 {
419     uint64_t            source_addr;    /* address from the start of buffer  */
420     uint64_t            dest_addr;      /* address from the start of buffer */
421     int                 total_length;   /* total length */
422     int                 length;         /* length of this packet */
423 #if REMOTE_EVENT
424     int                 ack_index;      /* index from integer to address */
425 #endif
426     uint8_t             seq_id;         //big message   0 meaning single message
427     gni_mem_handle_t    source_mem_hndl;
428     struct control_msg *next;
429 } CONTROL_MSG;
430
431 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
432
433 typedef struct ack_msg
434 {
435     uint64_t            source_addr;    /* address from the start of buffer  */
436 #if ! USE_LRTS_MEMPOOL
437     gni_mem_handle_t    source_mem_hndl;
438     int                 length;          /* total length */
439 #endif
440     struct ack_msg     *next;
441 } ACK_MSG;
442
443 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
444
445 #if CMK_DIRECT
446 typedef struct{
447     uint64_t    handler_addr;
448 }CMK_DIRECT_HEADER;
449
450 typedef struct {
451     char core[CmiMsgHeaderSizeBytes];
452     uint64_t handler;
453 }cmidirectMsg;
454
455 //SYH
456 CpvDeclare(int, CmiHandleDirectIdx);
457 void CmiHandleDirectMsg(cmidirectMsg* msg)
458 {
459
460     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
461    (*(_handle->callbackFnPtr))(_handle->callbackData);
462    CmiFree(msg);
463 }
464
465 void CmiDirectInit()
466 {
467     CpvInitialize(int,  CmiHandleDirectIdx);
468     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
469 }
470
471 #endif
472 typedef struct  rmda_msg
473 {
474     int                   destNode;
475 #if REMOTE_EVENT
476     int                   ack_index;
477 #endif
478     gni_post_descriptor_t *pd;
479 #if !CMK_SMP
480     struct  rmda_msg      *next;
481 #endif
482 }RDMA_REQUEST;
483
484
485 #if CMK_SMP
486 #define SMP_LOCKS                       1
487 #define ONE_SEND_QUEUE                  0
488 PCQueue sendRdmaBuf;
489 PCQueue sendPersistentBuf;
490 typedef struct  msg_list_index
491 {
492     PCQueue       sendSmsgBuf;
493 #if  SMP_LOCKS
494     CmiNodeLock   lock;
495     int           pushed;
496     int           destpe;
497 #endif
498 } MSG_LIST_INDEX;
499 char                *destpe_avail;
500 #else         /* non-smp */
501
502 static RDMA_REQUEST        *sendRdmaBuf = 0;
503 static RDMA_REQUEST        *sendPersistentBuf = 0;
504 static RDMA_REQUEST        *sendRdmaTail = 0;
505 typedef struct  msg_list_index
506 {
507     int         next;
508     MSG_LIST    *sendSmsgBuf;
509     MSG_LIST    *tail;
510 } MSG_LIST_INDEX;
511
512 #endif
513
514 // buffered send queue
515 #if ! ONE_SEND_QUEUE
516 typedef struct smsg_queue
517 {
518     MSG_LIST_INDEX   *smsg_msglist_index;
519     int               smsg_head_index;
520 #if  SMP_LOCKS
521     PCQueue     nonEmptyQueues;
522 #endif
523 } SMSG_QUEUE;
524 #else
525 typedef struct smsg_queue
526 {
527     PCQueue       sendMsgBuf;
528 }  SMSG_QUEUE;
529 #endif
530
531 SMSG_QUEUE                  smsg_queue;
532 #if CMK_USE_OOB
533 SMSG_QUEUE                  smsg_oob_queue;
534 #endif
535
536 #if CMK_SMP
537
538 #define FreeMsgList(d)   free(d);
539 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
540
541 #else
542
543 static MSG_LIST       *msglist_freelist=0;
544
545 #define FreeMsgList(d)  \
546   do { \
547   (d)->next = msglist_freelist;\
548   msglist_freelist = d; \
549   } while (0)
550
551 #define MallocMsgList(d) \
552   do {  \
553   d = msglist_freelist;\
554   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
555              _MEMCHECK(d);\
556   } else msglist_freelist = d->next; \
557   d->next =0;  \
558   } while (0)
559
560 #endif
561
562 #if CMK_SMP
563
564 #define FreeControlMsg(d)      free(d);
565 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
566
567 #else
568
569 static CONTROL_MSG    *control_freelist=0;
570
571 #define FreeControlMsg(d)       \
572   do { \
573   (d)->next = control_freelist;\
574   control_freelist = d; \
575   } while (0);
576
577 #define MallocControlMsg(d) \
578   do {  \
579   d = control_freelist;\
580   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
581              _MEMCHECK(d);\
582   } else control_freelist = d->next;  \
583   } while (0);
584
585 #endif
586
587 #if CMK_SMP
588
589 #define FreeAckMsg(d)      free(d);
590 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
591
592 #else
593
594 static ACK_MSG        *ack_freelist=0;
595
596 #define FreeAckMsg(d)       \
597   do { \
598   (d)->next = ack_freelist;\
599   ack_freelist = d; \
600   } while (0)
601
602 #define MallocAckMsg(d) \
603   do { \
604   d = ack_freelist;\
605   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
606              _MEMCHECK(d);\
607   } else ack_freelist = d->next; \
608   } while (0)
609
610 #endif
611
612
613 # if CMK_SMP
614 #define FreeRdmaRequest(d)       free(d);
615 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
616 #else
617
618 static RDMA_REQUEST         *rdma_freelist = NULL;
619
620 #define FreeRdmaRequest(d)       \
621   do {  \
622   (d)->next = rdma_freelist;\
623   rdma_freelist = d;    \
624   } while (0)
625
626 #define MallocRdmaRequest(d) \
627   do {   \
628   d = rdma_freelist;\
629   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
630              _MEMCHECK(d);\
631   } else rdma_freelist = d->next; \
632   d->next =0;   \
633   } while (0)
634 #endif
635
636 /* reuse gni_post_descriptor_t */
637 static gni_post_descriptor_t *post_freelist=0;
638
639 #if  !CMK_SMP
640 #define FreePostDesc(d)       \
641     (d)->next_descr = post_freelist;\
642     post_freelist = d;
643
644 #define MallocPostDesc(d) \
645   d = post_freelist;\
646   if (d==0) { \
647      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
648      d->next_descr = 0;\
649       _MEMCHECK(d);\
650   } else post_freelist = d->next_descr;
651 #else
652
653 #define FreePostDesc(d)     free(d);
654 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
655
656 #endif
657
658
659 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
660 static int      buffered_smsg_counter = 0;
661
662 /* SmsgSend return success but message sent is not confirmed by remote side */
663 static MSG_LIST *buffered_fma_head = 0;
664 static MSG_LIST *buffered_fma_tail = 0;
665
666 /* functions  */
667 #define IsFree(a,ind)  !( a& (1<<(ind) ))
668 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
669 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
670
671 CpvDeclare(mempool_type*, mempool);
672
673 #if CMK_PERSISTENT_COMM
674 CpvDeclare(mempool_type*, persistent_mempool);
675 #endif
676
677 #if REMOTE_EVENT
678 /* ack pool for remote events */
679
680 static int  SHIFT   =           18;
681 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
682 #define RANK_MASK               ((1<<SHIFT) - 1)
683 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
684
685 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
686 #define GET_RANK(evt)           ((evt) & RANK_MASK)
687 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
688
689 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
690
691 #if CMK_SMP
692 #define INIT_SIZE                4096
693 #else
694 #define INIT_SIZE                1024
695 #endif
696
697 struct IndexStruct {
698 void *addr;
699 int next;
700 int type;     // 1: ACK   2: Persistent
701 };
702
703 typedef struct IndexPool {
704     struct IndexStruct   *indexes;
705     int                   size;
706     int                   freehead;
707     CmiNodeLock           lock;
708 } IndexPool;
709
710 static IndexPool  ackPool;
711 #if CMK_PERSISTENT_COMM
712 static IndexPool  persistPool;
713 #endif
714
715 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
716 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
717
718 static void IndexPool_init(IndexPool *pool)
719 {
720     int i;
721     if ((1<<SHIFT) < mysize) 
722         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
723     pool->size = INIT_SIZE;
724     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
725     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
726     for (i=0; i<pool->size-1; i++) {
727         pool->indexes[i].next = i+1;
728         pool->indexes[i].type = 0;
729     }
730     pool->indexes[i].next = -1;
731     pool->freehead = 0;
732 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
733     pool->lock  = CmiCreateLock();
734 #else
735     pool->lock  = 0;
736 #endif
737 }
738
739 static
740 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
741 {
742     int s, i;
743 #if MULTI_THREAD_SEND  
744     CmiLock(pool->lock);
745 #endif
746     s = pool->freehead;
747     if (s == -1) {
748         int newsize = pool->size * 2;
749         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
750         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
751         struct IndexStruct *old_ackpool = pool->indexes;
752         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
753         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
754         for (i=pool->size; i<newsize-1; i++) {
755             pool->indexes[i].next = i+1;
756             pool->indexes[i].type = 0;
757         }
758         pool->indexes[i].next = -1;
759         pool->indexes[i].type = 0;
760         pool->freehead = pool->size;
761         s = pool->size;
762         pool->size = newsize;
763         free(old_ackpool);
764     }
765     pool->freehead = pool->indexes[s].next;
766     pool->indexes[s].addr = addr;
767     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
768     pool->indexes[s].type = type;
769 #if MULTI_THREAD_SEND
770     CmiUnlock(pool->lock);
771 #endif
772     return s;
773 }
774
775 static
776 inline  void IndexPool_freeslot(IndexPool *pool, int s)
777 {
778     CmiAssert(s>=0 && s<pool->size);
779 #if MULTI_THREAD_SEND
780     CmiLock(pool->lock);
781 #endif
782     pool->indexes[s].next = pool->freehead;
783     pool->indexes[s].type = 0;
784     pool->freehead = s;
785 #if MULTI_THREAD_SEND
786     CmiUnlock(pool->lock);
787 #endif
788 }
789
790
791 #endif
792
793 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
794 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
795 #define CHARM_MAGIC_NUMBER               126
796
797 #if CMK_ERROR_CHECKING
798 extern unsigned char computeCheckSum(unsigned char *data, int len);
799 static int checksum_flag = 0;
800 #define CMI_SET_CHECKSUM(msg, len)      \
801         if (checksum_flag)  {   \
802           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
803           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
804         }
805 #define CMI_CHECK_CHECKSUM(msg, len)    \
806         if (checksum_flag)      \
807           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
808             CmiAbort("Fatal error: checksum doesn't agree!\n");
809 #else
810 #define CMI_SET_CHECKSUM(msg, len)
811 #define CMI_CHECK_CHECKSUM(msg, len)
812 #endif
813 /* =====End of Definitions of Message-Corruption Related Macros=====*/
814
815 static int print_stats = 0;
816 static int stats_off = 0;
817 void CmiTurnOnStats()
818 {
819     stats_off = 0;
820     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
821 }
822
823 void CmiTurnOffStats()
824 {
825     stats_off = 1;
826 }
827
828 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
829
830 #if CMK_WITH_STATS
831 FILE *counterLog = NULL;
832 typedef struct comm_thread_stats
833 {
834     uint64_t  smsg_data_count;
835     uint64_t  lmsg_init_count;
836     uint64_t  ack_count;
837     uint64_t  big_msg_ack_count;
838     uint64_t  smsg_count;
839     uint64_t  direct_put_done_count;
840     uint64_t  put_done_count;
841     //times of calling SmsgSend
842     uint64_t  try_smsg_data_count;
843     uint64_t  try_lmsg_init_count;
844     uint64_t  try_ack_count;
845     uint64_t  try_big_msg_ack_count;
846     uint64_t  try_direct_put_done_count;
847     uint64_t  try_put_done_count;
848     uint64_t  try_smsg_count;
849     
850     double    max_time_in_send_buffered_smsg;
851     double    all_time_in_send_buffered_smsg;
852
853     uint64_t  rdma_get_count, rdma_put_count;
854     uint64_t  try_rdma_get_count, try_rdma_put_count;
855     double    max_time_from_control_to_rdma_init;
856     double    all_time_from_control_to_rdma_init;
857
858     double    max_time_from_rdma_init_to_rdma_done;
859     double    all_time_from_rdma_init_to_rdma_done;
860
861     int      count_in_PumpNetwork;
862     double   time_in_PumpNetwork;
863     double   max_time_in_PumpNetwork;
864     int      count_in_SendBufferMsg_smsg;
865     double   time_in_SendBufferMsg_smsg;
866     double   max_time_in_SendBufferMsg_smsg;
867     int      count_in_SendRdmaMsg;
868     double   time_in_SendRdmaMsg;
869     double   max_time_in_SendRdmaMsg;
870     int      count_in_PumpRemoteTransactions;
871     double   time_in_PumpRemoteTransactions;
872     double   max_time_in_PumpRemoteTransactions;
873     int      count_in_PumpLocalTransactions_rdma;
874     double   time_in_PumpLocalTransactions_rdma;
875     double   max_time_in_PumpLocalTransactions_rdma;
876     int      count_in_PumpDatagramConnection;
877     double   time_in_PumpDatagramConnection;
878     double   max_time_in_PumpDatagramConnection;
879 } Comm_Thread_Stats;
880
881 static Comm_Thread_Stats   comm_stats;
882
883 static char *counters_dirname = "counters";
884
885 static void init_comm_stats()
886 {
887   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
888   if (print_stats){
889       char ln[200];
890       int code = mkdir(counters_dirname, 00777); 
891       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
892       counterLog=fopen(ln,"w");
893       if (counterLog == NULL) CmiAbort("Counter files open failed");
894   }
895 }
896
897 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
898
899 #define SMSG_SENT_DONE(creation_time, tag)  \
900         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
901             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
902             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
903             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
904             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
905             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
906             comm_stats.smsg_count++; \
907             double inbuff_time = CmiWallTimer() - creation_time;   \
908             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
909             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
910         }
911
912 #define SMSG_TRY_SEND(tag)  \
913         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
914             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
915             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
916             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
917             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
918             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
919             comm_stats.try_smsg_count++; \
920         }
921
922 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
923
924 #define  RDMA_TRANS_DONE(x)      \
925          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
926              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
927              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
928          }
929
930 #define  RDMA_TRANS_INIT(type, x)      \
931          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
932              double rdma_trans_time = CmiWallTimer() - x ; \
933              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
934              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
935          }
936
937 #define STATS_PUMPNETWORK_TIME(x)   \
938         { double t = CmiWallTimer(); \
939           x;        \
940           t = CmiWallTimer() - t;          \
941           comm_stats.count_in_PumpNetwork++;        \
942           comm_stats.time_in_PumpNetwork += t;   \
943           if (t>comm_stats.max_time_in_PumpNetwork)      \
944               comm_stats.max_time_in_PumpNetwork = t;    \
945         }
946
947 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
948         { double t = CmiWallTimer(); \
949           x;        \
950           t = CmiWallTimer() - t;          \
951           comm_stats.count_in_PumpRemoteTransactions ++;        \
952           comm_stats.time_in_PumpRemoteTransactions += t;   \
953           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
954               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
955         }
956
957 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
958         { double t = CmiWallTimer(); \
959           x;        \
960           t = CmiWallTimer() - t;          \
961           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
962           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
963           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
964               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
965         }
966
967 #define STATS_SEND_SMSGS_TIME(x)   \
968         { double t = CmiWallTimer(); \
969           x;        \
970           t = CmiWallTimer() - t;          \
971           comm_stats.count_in_SendBufferMsg_smsg ++;        \
972           comm_stats.time_in_SendBufferMsg_smsg += t;   \
973           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
974               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
975         }
976
977 #define STATS_SENDRDMAMSG_TIME(x)   \
978         { double t = CmiWallTimer(); \
979           x;        \
980           t = CmiWallTimer() - t;          \
981           comm_stats.count_in_SendRdmaMsg ++;        \
982           comm_stats.time_in_SendRdmaMsg += t;   \
983           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
984               comm_stats.max_time_in_SendRdmaMsg = t;    \
985         }
986
987 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
988         { double t = CmiWallTimer(); \
989           x;        \
990           t = CmiWallTimer() - t;          \
991           comm_stats.count_in_PumpDatagramConnection ++;        \
992           comm_stats.time_in_PumpDatagramConnection += t;   \
993           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
994               comm_stats.max_time_in_PumpDatagramConnection = t;    \
995         }
996
997 static void print_comm_stats()
998 {
999     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
1000     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
1001             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
1002             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
1003     
1004     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
1005             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
1006             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
1007
1008     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
1009     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
1010     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
1011
1012
1013     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
1014     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
1015     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
1016     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
1017     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
1018     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
1019     if (useDynamicSMSG)
1020     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
1021
1022     fclose(counterLog);
1023 }
1024
1025 #else
1026 #define STATS_PUMPNETWORK_TIME(x)                  x
1027 #define STATS_SEND_SMSGS_TIME(x)                   x
1028 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
1029 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1030 #define STATS_SENDRDMAMSG_TIME(x)                  x
1031 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1032 #endif
1033
1034 static void
1035 allgather(void *in,void *out, int len)
1036 {
1037     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1038     int i,rc;
1039     int my_rank;
1040     char *tmp_buf,*out_ptr;
1041
1042     if(!already_called) {
1043
1044         rc = PMI_Get_size(&job_size);
1045         CmiAssert(rc == PMI_SUCCESS);
1046         rc = PMI_Get_rank(&my_rank);
1047         CmiAssert(rc == PMI_SUCCESS);
1048
1049         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1050         CmiAssert(ivec_ptr != NULL);
1051
1052         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1053         CmiAssert(rc == PMI_SUCCESS);
1054
1055         already_called = 1;
1056
1057     }
1058
1059     tmp_buf = (char *)malloc(job_size * len);
1060     CmiAssert(tmp_buf);
1061
1062     rc = PMI_Allgather(in,tmp_buf,len);
1063     CmiAssert(rc == PMI_SUCCESS);
1064
1065     out_ptr = out;
1066
1067     for(i=0;i<job_size;i++) {
1068
1069         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1070
1071     }
1072
1073     free(tmp_buf);
1074 }
1075
1076 static void
1077 allgather_2(void *in,void *out, int len)
1078 {
1079     //PMI_Allgather is out of order
1080     int i,rc, extend_len;
1081     int  rank_index;
1082     char *out_ptr, *out_ref;
1083     char *in2;
1084
1085     extend_len = sizeof(int) + len;
1086     in2 = (char*)malloc(extend_len);
1087
1088     memcpy(in2, &myrank, sizeof(int));
1089     memcpy(in2+sizeof(int), in, len);
1090
1091     out_ptr = (char*)malloc(mysize*extend_len);
1092
1093     rc = PMI_Allgather(in2, out_ptr, extend_len);
1094     GNI_RC_CHECK("allgather", rc);
1095
1096     out_ref = out;
1097
1098     for(i=0;i<mysize;i++) {
1099         //rank index 
1100         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1101         //copy to the rank index slot
1102         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1103     }
1104
1105     free(out_ptr);
1106     free(in2);
1107
1108 }
1109
1110 static unsigned int get_gni_nic_address(int device_id)
1111 {
1112     unsigned int address, cpu_id;
1113     gni_return_t status;
1114     int i, alps_dev_id=-1,alps_address=-1;
1115     char *token, *p_ptr;
1116
1117     p_ptr = getenv("PMI_GNI_DEV_ID");
1118     if (!p_ptr) {
1119         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1120        
1121         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1122     } else {
1123         while ((token = strtok(p_ptr,":")) != NULL) {
1124             alps_dev_id = atoi(token);
1125             if (alps_dev_id == device_id) {
1126                 break;
1127             }
1128             p_ptr = NULL;
1129         }
1130         CmiAssert(alps_dev_id != -1);
1131         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1132         CmiAssert(p_ptr != NULL);
1133         i = 0;
1134         while ((token = strtok(p_ptr,":")) != NULL) {
1135             if (i == alps_dev_id) {
1136                 alps_address = atoi(token);
1137                 break;
1138             }
1139             p_ptr = NULL;
1140             ++i;
1141         }
1142         CmiAssert(alps_address != -1);
1143         address = alps_address;
1144     }
1145     return address;
1146 }
1147
1148 static uint8_t get_ptag(void)
1149 {
1150     char *p_ptr, *token;
1151     uint8_t ptag;
1152
1153     p_ptr = getenv("PMI_GNI_PTAG");
1154     CmiAssert(p_ptr != NULL);
1155     token = strtok(p_ptr, ":");
1156     ptag = (uint8_t)atoi(token);
1157     return ptag;
1158         
1159 }
1160
1161 static uint32_t get_cookie(void)
1162 {
1163     uint32_t cookie;
1164     char *p_ptr, *token;
1165
1166     p_ptr = getenv("PMI_GNI_COOKIE");
1167     CmiAssert(p_ptr != NULL);
1168     token = strtok(p_ptr, ":");
1169     cookie = (uint32_t)atoi(token);
1170
1171     return cookie;
1172 }
1173
1174 #if LARGEPAGE
1175
1176 /* directly mmap memory from hugetlbfs for large pages */
1177
1178 #include <sys/stat.h>
1179 #include <fcntl.h>
1180 #include <sys/mman.h>
1181 #include <hugetlbfs.h>
1182
1183 // size must be _tlbpagesize aligned
1184 void *my_get_huge_pages(size_t size)
1185 {
1186     char filename[512];
1187     int fd;
1188     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1189     void *ptr = NULL;
1190
1191     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1192     fd = open(filename, O_RDWR | O_CREAT, mode);
1193     if (fd == -1) {
1194         CmiAbort("my_get_huge_pages: open filed");
1195     }
1196     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1197     if (ptr == MAP_FAILED) ptr = NULL;
1198 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1199     close(fd);
1200     unlink(filename);
1201     return ptr;
1202 }
1203
1204 void my_free_huge_pages(void *ptr, int size)
1205 {
1206 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1207     int ret = munmap(ptr, size);
1208     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1209 }
1210
1211 #endif
1212
1213 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1214 /* TODO: add any that are related */
1215 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1216
1217
1218 #include "machine-lrts.h"
1219 #include "machine-common-core.c"
1220
1221 /* Network progress function is used to poll the network when for
1222    messages. This flushes receive buffers on some  implementations*/
1223 #if CMK_MACHINE_PROGRESS_DEFINED
1224 void CmiMachineProgressImpl() {
1225 }
1226 #endif
1227
1228 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1229 #if CMK_SMP
1230 static void SendRdmaMsg(PCQueue );
1231 #else
1232 static void SendRdmaMsg();
1233 #endif 
1234 static void PumpNetworkSmsg();
1235 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1236 #if CQWRITE
1237 static void PumpCqWriteTransactions();
1238 #endif
1239 #if REMOTE_EVENT
1240 static void PumpRemoteTransactions(gni_cq_handle_t);
1241 #endif
1242
1243 #if MACHINE_DEBUG_LOG
1244 FILE *debugLog = NULL;
1245 static CmiInt8 buffered_recv_msg = 0;
1246 int         lrts_smsg_success = 0;
1247 int         lrts_received_msg = 0;
1248 #endif
1249
1250 static void sweep_mempool(mempool_type *mptr)
1251 {
1252     int n = 0;
1253     block_header *current = &(mptr->block_head);
1254
1255     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1256     while( current!= NULL) {
1257         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1258         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1259     }
1260     printf("[n %d] sweep_mempool slot END.\n", myrank);
1261 }
1262
1263 inline
1264 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1265 {
1266     block_header *current = *from;
1267
1268     //while(register_memory_size>= MAX_REG_MEM)
1269     //{
1270         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1271             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1272
1273         *from = current;
1274         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1275         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1276         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1277     //}
1278     return GNI_RC_SUCCESS;
1279 }
1280
1281 inline 
1282 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1283 {
1284     gni_return_t status = GNI_RC_SUCCESS;
1285     //int size = GetMempoolsize(msg);
1286     //void *blockaddr = GetMempoolBlockPtr(msg);
1287     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1288    
1289     block_header *current = &(mptr->block_head);
1290     while(register_memory_size>= MAX_REG_MEM)
1291     {
1292         status = deregisterMemory(mptr, &current);
1293         if (status != GNI_RC_SUCCESS) break;
1294     }
1295     if(register_memory_size>= MAX_REG_MEM) return status;
1296
1297     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1298     while(1)
1299     {
1300         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1301         if(status == GNI_RC_SUCCESS)
1302         {
1303             break;
1304         }
1305         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1306         {
1307             GNI_RC_CHECK("registerFromMempool", status);
1308         }
1309         else
1310         {
1311             status = deregisterMemory(mptr, &current);
1312             if (status != GNI_RC_SUCCESS) break;
1313         }
1314     }; 
1315     return status;
1316 }
1317
1318 inline 
1319 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1320 {
1321     static int rank = -1;
1322     int i;
1323     gni_return_t status;
1324     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1325     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1326     mempool_type *mptr;
1327
1328     status = registerFromMempool(mptr1, msg, size, t, cqh);
1329     if (status == GNI_RC_SUCCESS) return status;
1330 #if CMK_SMP 
1331     for (i=0; i<CmiMyNodeSize()+1; i++) {
1332       rank = (rank+1)%(CmiMyNodeSize()+1);
1333       mptr = CpvAccessOther(mempool, rank);
1334       if (mptr == mptr1) continue;
1335       status = registerFromMempool(mptr, msg, size, t, cqh);
1336       if (status == GNI_RC_SUCCESS) return status;
1337     }
1338 #endif
1339     return  GNI_RC_ERROR_RESOURCE;
1340 }
1341
1342 inline
1343 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1344 {
1345     MSG_LIST        *msg_tmp;
1346     MallocMsgList(msg_tmp);
1347     msg_tmp->destNode = destNode;
1348     msg_tmp->size   = size;
1349     msg_tmp->msg    = msg;
1350     msg_tmp->tag    = tag;
1351 #if CMK_WITH_STATS
1352     SMSG_CREATION(msg_tmp)
1353 #endif
1354 #if !CMK_SMP
1355     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1356         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1357         queue->smsg_head_index = destNode;
1358         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1359     }else
1360     {
1361         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1362         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1363     }
1364 #else
1365 #if ONE_SEND_QUEUE
1366     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1367 #else
1368 #if SMP_LOCKS
1369     CmiLock(queue->smsg_msglist_index[destNode].lock);
1370     if(queue->smsg_msglist_index[destNode].pushed == 0)
1371     {
1372         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1373     }
1374     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1375     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1376 #else
1377     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1378 #endif
1379 #endif
1380 #endif
1381 #if PRINT_SYH
1382     buffered_smsg_counter++;
1383 #endif
1384 }
1385
1386 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1387 {
1388     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1389 }
1390
1391 inline
1392 static void setup_smsg_connection(int destNode)
1393 {
1394     mdh_addr_list_t  *new_entry = 0;
1395     gni_post_descriptor_t *pd;
1396     gni_smsg_attr_t      *smsg_attr;
1397     gni_return_t status = GNI_RC_NOT_DONE;
1398     RDMA_REQUEST        *rdma_request_msg;
1399     
1400     if(smsg_available_slot == smsg_expand_slots)
1401     {
1402         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1403         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1404         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1405
1406         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1407             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1408             GNI_MEM_READWRITE,   
1409             -1,
1410             &(new_entry->mdh));
1411         smsg_available_slot = 0; 
1412         new_entry->next = smsg_dynamic_list;
1413         smsg_dynamic_list = new_entry;
1414     }
1415     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1416     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1417     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1418     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1419     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1420     smsg_attr->buff_size = smsg_memlen;
1421     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1422     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1423     smsg_local_attr_vec[destNode] = smsg_attr;
1424     smsg_available_slot++;
1425     MallocPostDesc(pd);
1426     pd->type            = GNI_POST_FMA_PUT;
1427     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1428     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1429     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1430     pd->length          = sizeof(gni_smsg_attr_t);
1431     pd->local_addr      = (uint64_t) smsg_attr;
1432     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1433     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1434 #if MULTI_THREAD_SEND
1435     pd->src_cq_hndl     = rdma_tx_cqh;
1436 #else
1437     pd->src_cq_hndl     = 0;
1438 #endif
1439     pd->rdma_mode       = 0;
1440     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1441     print_smsg_attr(smsg_attr);
1442     if(status == GNI_RC_ERROR_RESOURCE )
1443     {
1444         MallocRdmaRequest(rdma_request_msg);
1445         rdma_request_msg->destNode = destNode;
1446         rdma_request_msg->pd = pd;
1447         /* buffer this request */
1448     }
1449 #if PRINT_SYH
1450     if(status != GNI_RC_SUCCESS)
1451        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1452     else
1453         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1454 #endif
1455 }
1456
1457 /* useDynamicSMSG */
1458 inline 
1459 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1460 {
1461     gni_return_t status = GNI_RC_NOT_DONE;
1462
1463     if(mailbox_list->offset == mailbox_list->size)
1464     {
1465         dynamic_smsg_mailbox_t *new_mailbox_entry;
1466         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1467         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1468         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1469         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1470         new_mailbox_entry->offset = 0;
1471         
1472         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1473             new_mailbox_entry->size, smsg_rx_cqh,
1474             GNI_MEM_READWRITE,   
1475             -1,
1476             &(new_mailbox_entry->mem_hndl));
1477
1478         GNI_RC_CHECK("register", status);
1479         new_mailbox_entry->next = mailbox_list;
1480         mailbox_list = new_mailbox_entry;
1481     }
1482     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1483     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1484     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1485     local_smsg_attr->mbox_offset = mailbox_list->offset;
1486     mailbox_list->offset += smsg_memlen;
1487     local_smsg_attr->buff_size = smsg_memlen;
1488     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1489     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1490 }
1491
1492 /* useDynamicSMSG */
1493 inline 
1494 static int connect_to(int destNode)
1495 {
1496     gni_return_t status = GNI_RC_NOT_DONE;
1497     CmiAssert(smsg_connected_flag[destNode] == 0);
1498     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1499     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1500     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1501     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1502     
1503     CMI_GNI_LOCK(global_gni_lock)
1504     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1505     CMI_GNI_UNLOCK(global_gni_lock)
1506     if (status == GNI_RC_ERROR_RESOURCE) {
1507       /* possibly destNode is making connection at the same time */
1508       free(smsg_attr_vector_local[destNode]);
1509       smsg_attr_vector_local[destNode] = NULL;
1510       free(smsg_attr_vector_remote[destNode]);
1511       smsg_attr_vector_remote[destNode] = NULL;
1512       mailbox_list->offset -= smsg_memlen;
1513 #if PRINT_SYH
1514     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1515 #endif
1516       return 0;
1517     }
1518     GNI_RC_CHECK("GNI_Post", status);
1519     smsg_connected_flag[destNode] = 1;
1520 #if PRINT_SYH
1521     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1522 #endif
1523     return 1;
1524 }
1525
1526 inline 
1527 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1528 {
1529     unsigned int          remote_address;
1530     uint32_t              remote_id;
1531     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1532     gni_smsg_attr_t       *smsg_attr;
1533     gni_post_descriptor_t *pd;
1534     gni_post_state_t      post_state;
1535     char                  *real_data; 
1536
1537     if (useDynamicSMSG) {
1538         switch (smsg_connected_flag[destNode]) {
1539         case 0: 
1540             connect_to(destNode);         /* continue to case 1 */
1541         case 1:                           /* pending connection, do nothing */
1542             status = GNI_RC_NOT_DONE;
1543             if(inbuff ==0)
1544                 buffer_small_msgs(queue, msg, size, destNode, tag);
1545             return status;
1546         }
1547     }
1548 #if CMK_SMP
1549 #if ! ONE_SEND_QUEUE
1550     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1551 #endif
1552     {
1553 #else
1554     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1555     {
1556 #endif
1557         //CMI_GNI_LOCK(smsg_mailbox_lock)
1558         CMI_GNI_LOCK(default_tx_cq_lock)
1559 #if CMK_SMP_TRACE_COMMTHREAD
1560         int oldpe = -1;
1561         int oldeventid = -1;
1562         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1563         { 
1564             START_EVENT();
1565             if ( tag == SMALL_DATA_TAG)
1566                 real_data = (char*)msg; 
1567             else 
1568                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1569             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1570             TRACE_COMM_SET_COMM_MSGID(real_data);
1571         }
1572 #endif
1573 #if REMOTE_EVENT
1574         if (tag == LMSG_INIT_TAG) {
1575             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1576             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1577                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1578         }
1579         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1580 #endif
1581 #if     CMK_WITH_STATS
1582         SMSG_TRY_SEND(tag)
1583 #endif
1584 #if CMK_WITH_STATS
1585     double              creation_time;
1586     if (ptr == NULL)
1587         creation_time = CmiWallTimer();
1588     else
1589         creation_time = ptr->creation_time;
1590 #endif
1591
1592     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1593 #if CMK_SMP_TRACE_COMMTHREAD
1594         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1595 #endif
1596         CMI_GNI_UNLOCK(default_tx_cq_lock)
1597         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1598         if(status == GNI_RC_SUCCESS)
1599         {
1600 #if     CMK_WITH_STATS
1601             SMSG_SENT_DONE(creation_time,tag) 
1602 #endif
1603 #if CMK_SMP_TRACE_COMMTHREAD
1604             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1605             { 
1606                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1607             }
1608 #endif
1609         }else
1610             status = GNI_RC_ERROR_RESOURCE;
1611     }
1612     if(status != GNI_RC_SUCCESS && inbuff ==0)
1613         buffer_small_msgs(queue, msg, size, destNode, tag);
1614     return status;
1615 }
1616
1617 inline 
1618 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1619 {
1620     /* construct a control message and send */
1621     CONTROL_MSG         *control_msg_tmp;
1622     MallocControlMsg(control_msg_tmp);
1623     control_msg_tmp->source_addr = (uint64_t)msg;
1624     control_msg_tmp->seq_id    = seqno;
1625     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1626 #if REMOTE_EVENT
1627     control_msg_tmp->ack_index    =  -1;
1628 #endif
1629 #if     USE_LRTS_MEMPOOL
1630     if(size < BIG_MSG)
1631     {
1632         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1633     }
1634     else
1635     {
1636         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1637         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1638         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1639     }
1640 #else
1641     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1642 #endif
1643     return control_msg_tmp;
1644 }
1645
1646 #define BLOCKING_SEND_CONTROL    0
1647
1648 // Large message, send control to receiver, receiver register memory and do a GET, 
1649 // return 1 - send no success
1650 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1651 {
1652     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1653     uint32_t            vmdh_index  = -1;
1654     int                 size;
1655     int                 offset = 0;
1656     uint64_t            source_addr;
1657     int                 register_size; 
1658     void                *msg;
1659
1660     size    =   control_msg_tmp->total_length;
1661     source_addr = control_msg_tmp->source_addr;
1662     register_size = control_msg_tmp->length;
1663
1664 #if  USE_LRTS_MEMPOOL
1665     if( control_msg_tmp->seq_id == 0 ){
1666 #if BLOCKING_SEND_CONTROL
1667         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1668             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1669                 LrtsAdvanceCommunication(0);
1670         }
1671 #endif
1672         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1673         {
1674             msg = (void*)source_addr;
1675             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1676             {
1677                 if(!inbuff)
1678                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1679                 return GNI_RC_ERROR_NOMEM;
1680             }
1681             //register the corresponding mempool
1682             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1683             if(status == GNI_RC_SUCCESS)
1684             {
1685                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1686             }
1687         }else
1688         {
1689             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1690             status = GNI_RC_SUCCESS;
1691         }
1692         if(NoMsgInSend(source_addr))
1693             register_size = GetMempoolsize((void*)(source_addr));
1694         else
1695             register_size = 0;
1696     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1697     {
1698         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1699         source_addr += offset;
1700         size = control_msg_tmp->length;
1701 #if BLOCKING_SEND_CONTROL
1702         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1703             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1704                 LrtsAdvanceCommunication(0);
1705         }
1706 #endif
1707         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1708             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1709             {
1710                 if(!inbuff)
1711                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1712                 return GNI_RC_ERROR_NOMEM;
1713             }
1714             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1715             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1716         }
1717         else
1718         {
1719             status = GNI_RC_SUCCESS;
1720         }
1721         register_size = 0;  
1722     }
1723
1724 #if CMI_EXERT_SEND_LARGE_CAP
1725     if(SEND_large_pending >= SEND_large_cap)
1726     {
1727         status = GNI_RC_ERROR_NOMEM;
1728     }
1729 #endif
1730  
1731     if(status == GNI_RC_SUCCESS)
1732     {
1733        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1734         if(status == GNI_RC_SUCCESS)
1735         {
1736 #if CMI_EXERT_SEND_LARGE_CAP
1737             SEND_large_pending++;
1738 #endif
1739             buffered_send_msg += register_size;
1740             if(control_msg_tmp->seq_id == 0)
1741             {
1742                 IncreaseMsgInSend(source_addr);
1743             }
1744             FreeControlMsg(control_msg_tmp);
1745             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1746         }else
1747             status = GNI_RC_ERROR_RESOURCE;
1748
1749     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1750     {
1751         CmiAbort("Memory registor for large msg\n");
1752     }else 
1753     {
1754         status = GNI_RC_ERROR_NOMEM; 
1755         if(!inbuff)
1756             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1757     }
1758     return status;
1759 #else
1760     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1761     if(status == GNI_RC_SUCCESS)
1762     {
1763         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1764         if(status == GNI_RC_SUCCESS)
1765         {
1766             FreeControlMsg(control_msg_tmp);
1767         }
1768     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1769     {
1770         CmiAbort("Memory registor for large msg\n");
1771     }else 
1772     {
1773         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1774     }
1775     return status;
1776 #endif
1777 }
1778
1779 inline void LrtsPrepareEnvelope(char *msg, int size)
1780 {
1781     CmiSetMsgSize(msg, size);
1782     CMI_SET_CHECKSUM(msg, size);
1783 }
1784
1785 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1786 {
1787     gni_return_t        status  =   GNI_RC_SUCCESS;
1788     uint8_t tag;
1789     CONTROL_MSG         *control_msg_tmp;
1790     int                 oob = ( mode & OUT_OF_BAND);
1791     SMSG_QUEUE          *queue;
1792
1793     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1794 #if CMK_USE_OOB
1795     queue = oob? &smsg_oob_queue : &smsg_queue;
1796 #else
1797     queue = &smsg_queue;
1798 #endif
1799
1800     LrtsPrepareEnvelope(msg, size);
1801
1802 #if PRINT_SYH
1803     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1804 #endif 
1805 #if CMK_SMP 
1806     if(size <= SMSG_MAX_MSG)
1807         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1808     else if (size < BIG_MSG) {
1809         control_msg_tmp =  construct_control_msg(size, msg, 0);
1810         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1811     }
1812     else {
1813           CmiSetMsgSeq(msg, 0);
1814           control_msg_tmp =  construct_control_msg(size, msg, 1);
1815           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1816     }
1817 #else   //non-smp, smp(worker sending)
1818     if(size <= SMSG_MAX_MSG)
1819     {
1820         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1821             CmiFree(msg);
1822     }
1823     else if (size < BIG_MSG) {
1824         control_msg_tmp =  construct_control_msg(size, msg, 0);
1825         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1826     }
1827     else {
1828 #if     USE_LRTS_MEMPOOL
1829         CmiSetMsgSeq(msg, 0);
1830         control_msg_tmp =  construct_control_msg(size, msg, 1);
1831         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1832 #else
1833         control_msg_tmp =  construct_control_msg(size, msg, 0);
1834         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1835 #endif
1836     }
1837 #endif
1838     return 0;
1839 }
1840
1841 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1842 {
1843   int i;
1844 #if CMK_BROADCAST_USE_CMIREFERENCE
1845   for(i=0;i<npes;i++) {
1846     if (pes[i] == CmiMyPe())
1847       CmiSyncSend(pes[i], len, msg);
1848     else {
1849       CmiReference(msg);
1850       CmiSyncSendAndFree(pes[i], len, msg);
1851     }
1852   }
1853 #else
1854   for(i=0;i<npes;i++) {
1855     CmiSyncSend(pes[i], len, msg);
1856   }
1857 #endif
1858 }
1859
1860 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1861 {
1862   /* A better asynchronous implementation may be wanted, but at least it works */
1863   CmiSyncListSendFn(npes, pes, len, msg);
1864   return (CmiCommHandle) 0;
1865 }
1866
1867 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1868 {
1869   if (npes == 1) {
1870       CmiSyncSendAndFree(pes[0], len, msg);
1871       return;
1872   }
1873 #if CMK_PERSISTENT_COMM
1874   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
1875       int i;
1876       for(i=0;i<npes;i++) {
1877         if (pes[i] == CmiMyPe())
1878           CmiSyncSend(pes[i], len, msg);
1879         else {
1880           CmiReference(msg);
1881           CmiSyncSendAndFree(pes[i], len, msg);
1882         }
1883       }
1884       CmiFree(msg);
1885       return;
1886   }
1887 #endif
1888   
1889 #if CMK_BROADCAST_USE_CMIREFERENCE
1890   CmiSyncListSendFn(npes, pes, len, msg);
1891   CmiFree(msg);
1892 #else
1893   int i;
1894   for(i=0;i<npes-1;i++) {
1895     CmiSyncSend(pes[i], len, msg);
1896   }
1897   if (npes>0)
1898     CmiSyncSendAndFree(pes[npes-1], len, msg);
1899   else 
1900     CmiFree(msg);
1901 #endif
1902 }
1903
1904 static void    PumpDatagramConnection();
1905 static      int         event_SetupConnect = 111;
1906 static      int         event_PumpSmsg = 222 ;
1907 static      int         event_PumpTransaction = 333;
1908 static      int         event_PumpRdmaTransaction = 444;
1909 static      int         event_SendBufferSmsg = 484;
1910 static      int         event_SendFmaRdmaMsg = 555;
1911 static      int         event_AdvanceCommunication = 666;
1912
1913 static void registerUserTraceEvents() {
1914 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1915     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1916     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1917     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1918     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1919     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1920     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1921     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1922 #endif
1923 }
1924
1925 static void ProcessDeadlock()
1926 {
1927     static CmiUInt8 *ptr = NULL;
1928     static CmiUInt8  last = 0, mysum, sum;
1929     static int count = 0;
1930     gni_return_t status;
1931     int i;
1932
1933 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1934 //sweep_mempool(CpvAccess(mempool));
1935     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1936     mysum = smsg_send_count + smsg_recv_count;
1937     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1938     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1939     GNI_RC_CHECK("PMI_Allgather", status);
1940     sum = 0;
1941     for (i=0; i<mysize; i++)  sum+= ptr[i];
1942     if (last == 0 || sum == last) 
1943         count++;
1944     else
1945         count = 0;
1946     last = sum;
1947     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1948     if (count == 2) { 
1949         /* detected twice, it is a real deadlock */
1950         if (myrank == 0)  {
1951             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1952             CmiAbort("Fatal> Deadlock detected.");
1953         }
1954
1955     }
1956     _detected_hang = 0;
1957 }
1958
1959 static void CheckProgress()
1960 {
1961     if (smsg_send_count == last_smsg_send_count &&
1962         smsg_recv_count == last_smsg_recv_count ) 
1963     {
1964         _detected_hang = 1;
1965 #if !CMK_SMP
1966         if (_detected_hang) ProcessDeadlock();
1967 #endif
1968
1969     }
1970     else {
1971         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1972         last_smsg_send_count = smsg_send_count;
1973         last_smsg_recv_count = smsg_recv_count;
1974         _detected_hang = 0;
1975     }
1976 }
1977
1978 static void set_limit()
1979 {
1980     //if (!user_set_flag && CmiMyRank() == 0) {
1981     if (CmiMyRank() == 0) {
1982         int mynode = CmiPhysicalNodeID(CmiMyPe());
1983         int numpes = CmiNumPesOnPhysicalNode(mynode);
1984         int numprocesses = numpes / CmiMyNodeSize();
1985         MAX_REG_MEM  = _totalmem / numprocesses;
1986         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1987         if (CmiMyPe() == 0)
1988            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1989         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1990         {
1991              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1992              CmiAbort("memory registration\n");
1993         }
1994     }
1995 }
1996
1997 void LrtsPostCommonInit(int everReturn)
1998 {
1999 #if CMK_DIRECT
2000     CmiDirectInit();
2001 #endif
2002 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2003     CpvInitialize(double, projTraceStart);
2004     /* only PE 0 needs to care about registration (to generate sts file). */
2005     //if (CmiMyPe() == 0) 
2006     {
2007         registerMachineUserEventsFunction(&registerUserTraceEvents);
2008     }
2009 #endif
2010
2011 #if CMK_SMP
2012     CmiIdleState *s=CmiNotifyGetState();
2013     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2014     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2015 #else
2016     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
2017     if (useDynamicSMSG)
2018     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2019 #endif
2020
2021 #if ! LARGEPAGE
2022     if (_checkProgress)
2023 #if CMK_SMP
2024     if (CmiMyRank() == 0)
2025 #endif
2026     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2027 #endif
2028  
2029 #if !LARGEPAGE
2030     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2031 #endif
2032 }
2033
2034 /* this is called by worker thread */
2035 void LrtsPostNonLocal(){
2036
2037 #if CMK_WORKER_SINGLE_TASK
2038     if (CmiMyRank() % 5 == 1)
2039 #endif
2040     {
2041         traceEndIdle();
2042 #if REMOTE_EVENT
2043         RECV_PERSISTENT
2044 #endif
2045          SEND_PERSISTENT
2046          traceBeginIdle();
2047     }
2048 #if 0
2049 #if CMK_SMP_TRACE_COMMTHREAD
2050     double startT, endT;
2051 #endif
2052
2053 #if MULTI_THREAD_SEND
2054     if(mysize == 1) return;
2055
2056 #if CMK_SMP_TRACE_COMMTHREAD
2057     traceEndIdle();
2058     startT = CmiWallTimer();
2059 #endif
2060
2061 #if CMK_WORKER_SINGLE_TASK
2062     if (CmiMyRank() % 6 == 0)
2063 #endif
2064     PumpNetworkSmsg();
2065
2066 #if CMK_WORKER_SINGLE_TASK
2067     if (CmiMyRank() % 6 == 1)
2068 #endif
2069     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2070
2071 #if CMK_WORKER_SINGLE_TASK
2072     if (CmiMyRank() % 6 == 2)
2073 #endif
2074     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2075
2076 #if REMOTE_EVENT
2077 #if CMK_WORKER_SINGLE_TASK
2078     if (CmiMyRank() % 6 == 3)
2079 #endif
2080     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2081 #endif
2082
2083 #if CMK_WORKER_SINGLE_TASK
2084     if (CmiMyRank() % 6 == 4)
2085 #endif
2086     {
2087 #if CMK_USE_OOB
2088     SendBufferMsg(&smsg_oob_queue, NULL);
2089     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2090 #else
2091     SendBufferMsg(&smsg_queue, NULL);
2092 #endif
2093     }
2094
2095 #if CMK_WORKER_SINGLE_TASK
2096     if (CmiMyRank() % 6 == 5)
2097 #endif
2098 #if CMK_SMP
2099     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2100 #else
2101     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2102 #endif
2103
2104 #if CMK_SMP_TRACE_COMMTHREAD
2105     endT = CmiWallTimer();
2106     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2107     traceBeginIdle();
2108 #endif
2109
2110 #endif
2111 #endif
2112 }
2113
2114 /* useDynamicSMSG */
2115 static void    PumpDatagramConnection()
2116 {
2117     uint32_t          remote_address;
2118     uint32_t          remote_id;
2119     gni_return_t status;
2120     gni_post_state_t  post_state;
2121     uint64_t          datagram_id;
2122     int i;
2123
2124    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2125    {
2126        if (datagram_id >= mysize) {           /* bound endpoint */
2127            int pe = datagram_id - mysize;
2128            CMI_GNI_LOCK(global_gni_lock)
2129            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2130            CMI_GNI_UNLOCK(global_gni_lock)
2131            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2132            {
2133                CmiAssert(remote_id == pe);
2134                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2135                GNI_RC_CHECK("Dynamic SMSG Init", status);
2136 #if PRINT_SYH
2137                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2138 #endif
2139                CmiAssert(smsg_connected_flag[pe] == 1);
2140                smsg_connected_flag[pe] = 2;
2141            }
2142        }
2143        else {         /* unbound ep */
2144            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2145            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2146            {
2147                CmiAssert(remote_id<mysize);
2148                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2149                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2150                GNI_RC_CHECK("Dynamic SMSG Init", status);
2151 #if PRINT_SYH
2152                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2153 #endif
2154                smsg_connected_flag[remote_id] = 2;
2155
2156                alloc_smsg_attr(&send_smsg_attr);
2157                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2158                GNI_RC_CHECK("post unbound datagram", status);
2159            }
2160        }
2161    }
2162 }
2163
2164 /* pooling CQ to receive network message */
2165 static void PumpNetworkRdmaMsgs()
2166 {
2167     gni_cq_entry_t      event_data;
2168     gni_return_t        status;
2169
2170 }
2171
2172 #if CMK_SMP
2173 inline 
2174 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2175 {
2176     RDMA_REQUEST        *rdma_request_msg;
2177     MallocRdmaRequest(rdma_request_msg);
2178     rdma_request_msg->destNode = inst_id;
2179     rdma_request_msg->pd = pd;
2180 #if REMOTE_EVENT
2181     rdma_request_msg->ack_index = ack_index;
2182 #endif
2183     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2184 }
2185
2186 #else
2187 inline 
2188 static void bufferRdmaMsg(RDMA_REQUEST *bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2189 {
2190     RDMA_REQUEST        *rdma_request_msg;
2191     MallocRdmaRequest(rdma_request_msg);
2192     rdma_request_msg->destNode = inst_id;
2193     rdma_request_msg->pd = pd;
2194 #if REMOTE_EVENT
2195     rdma_request_msg->ack_index = ack_index;
2196 #endif
2197     if(bufferqueue == 0)
2198     {
2199         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2200     }else{
2201         sendRdmaTail->next = rdma_request_msg;
2202         sendRdmaTail =  rdma_request_msg;
2203     }
2204 }
2205
2206 #endif
2207
2208 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2209
2210 static void PumpNetworkSmsg()
2211 {
2212     uint64_t            inst_id;
2213     gni_cq_entry_t      event_data;
2214     gni_return_t        status;
2215     void                *header;
2216     uint8_t             msg_tag;
2217     int                 msg_nbytes;
2218     void                *msg_data;
2219     gni_mem_handle_t    msg_mem_hndl;
2220     gni_smsg_attr_t     *smsg_attr;
2221     gni_smsg_attr_t     *remote_smsg_attr;
2222     int                 init_flag;
2223     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2224     uint64_t            source_addr;
2225     SMSG_QUEUE         *queue = &smsg_queue;
2226 #if  CMK_DIRECT
2227     cmidirectMsg        *direct_msg;
2228 #endif
2229 #if CMI_PUMPNETWORKSMSG_CAP 
2230     int                  recv_cnt = 0;
2231     while(recv_cnt< PumpNetworkSmsg_cap) {
2232 #else
2233     while(1) {
2234 #endif
2235         CMI_GNI_LOCK(smsg_rx_cq_lock)
2236         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2237         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2238         if(status != GNI_RC_SUCCESS) break;
2239
2240         inst_id = GNI_CQ_GET_INST_ID(event_data);
2241 #if REMOTE_EVENT
2242         inst_id = GET_RANK(inst_id);      /* important */
2243 #endif
2244         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2245 #if PRINT_SYH
2246         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2247 #endif
2248         if (useDynamicSMSG) {
2249             /* subtle: smsg may come before connection is setup */
2250             while (smsg_connected_flag[inst_id] != 2) 
2251                PumpDatagramConnection();
2252         }
2253         msg_tag = GNI_SMSG_ANY_TAG;
2254         while(1) {
2255             CMI_GNI_LOCK(smsg_mailbox_lock)
2256             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2257             if (status != GNI_RC_SUCCESS)
2258             {
2259                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2260                 break;
2261             }
2262 #if         CMI_PUMPNETWORKSMSG_CAP
2263             recv_cnt++; 
2264 #endif
2265 #if PRINT_SYH
2266             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2267 #endif
2268             /* copy msg out and then put into queue (small message) */
2269             switch (msg_tag) {
2270             case SMALL_DATA_TAG:
2271             {
2272                 START_EVENT();
2273                 msg_nbytes = CmiGetMsgSize(header);
2274                 msg_data    = CmiAlloc(msg_nbytes);
2275                 memcpy(msg_data, (char*)header, msg_nbytes);
2276                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2277                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2278                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2279                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2280                 handleOneRecvedMsg(msg_nbytes, msg_data);
2281                 break;
2282             }
2283             case LMSG_INIT_TAG:
2284             {
2285 #if MULTI_THREAD_SEND
2286                 MallocControlMsg(control_msg_tmp);
2287                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2288                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2289                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2290                 getLargeMsgRequest(control_msg_tmp, inst_id);
2291                 FreeControlMsg(control_msg_tmp);
2292 #else
2293                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2294                 getLargeMsgRequest(header, inst_id);
2295                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2296 #endif
2297                 break;
2298             }
2299 #if !REMOTE_EVENT && !CQWRITE
2300             case ACK_TAG:   //msg fit into mempool
2301             {
2302                 /* Get is done, release message . Now put is not used yet*/
2303                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2304                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2305                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2306 #if ! USE_LRTS_MEMPOOL
2307                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2308 #else
2309                 DecreaseMsgInSend(msg);
2310 #endif
2311                 if(NoMsgInSend(msg))
2312                     buffered_send_msg -= GetMempoolsize(msg);
2313                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2314                 CmiFree(msg);
2315 #if CMI_EXERT_SEND_LARGE_CAP
2316                 SEND_large_pending--;
2317 #endif
2318                 break;
2319             }
2320 #endif
2321             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2322             {
2323 #if MULTI_THREAD_SEND
2324                 MallocControlMsg(header_tmp);
2325                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2326                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2327 #else
2328                 header_tmp = (CONTROL_MSG *) header;
2329 #endif
2330                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2331 #if CMI_EXERT_SEND_LARGE_CAP
2332                 SEND_large_pending--;
2333 #endif
2334                 void *msg = (void*)(header_tmp->source_addr);
2335                 int cur_seq = CmiGetMsgSeq(msg);
2336                 int offset = ONE_SEG*(cur_seq+1);
2337                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2338                 buffered_send_msg -= header_tmp->length;
2339                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2340                 if (remain_size < 0) remain_size = 0;
2341                 CmiSetMsgSize(msg, remain_size);
2342                 if(remain_size <= 0) //transaction done
2343                 {
2344                     CmiFree(msg);
2345                 }else if (header_tmp->total_length > offset)
2346                 {
2347                     CmiSetMsgSeq(msg, cur_seq+1);
2348                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2349                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2350                     //send next seg
2351                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2352                          // pipelining
2353                     if (header_tmp->seq_id == 1) {
2354                       int i;
2355                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2356                         int seq = cur_seq+i+2;
2357                         CmiSetMsgSeq(msg, seq-1);
2358                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2359                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2360                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2361                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2362                       }
2363                     }
2364                 }
2365 #if MULTI_THREAD_SEND
2366                 FreeControlMsg(header_tmp);
2367 #else
2368                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2369 #endif
2370                 break;
2371             }
2372 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2373             case PUT_DONE_TAG:  {   //persistent message
2374                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2375                 int size = ((CONTROL_MSG *) header)->length;
2376                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2377                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2378                 CmiReference(msg);
2379                 CMI_CHECK_CHECKSUM(msg, size);
2380                 handleOneRecvedMsg(size, msg); 
2381 #if PRINT_SYH
2382                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2383 #endif
2384                 break;
2385             }
2386 #endif
2387 #if CMK_DIRECT
2388             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2389                 //create a trigger message
2390                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2391                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2392                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2393                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2394                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2395                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2396                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2397                 break;
2398 #endif
2399             default:
2400                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2401                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2402                 printf("weird tag problem\n");
2403                 CmiAbort("Unknown tag\n");
2404             }               // end switch
2405 #if PRINT_SYH
2406             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2407 #endif
2408             smsg_recv_count ++;
2409             msg_tag = GNI_SMSG_ANY_TAG;
2410         } //endwhile GNI_SmsgGetNextWTag
2411     }   //end while GetEvent
2412     if(status == GNI_RC_ERROR_RESOURCE)
2413     {
2414         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2415         GNI_RC_CHECK("Smsg_rx_cq full", status);
2416     }
2417 }
2418
2419 static void printDesc(gni_post_descriptor_t *pd)
2420 {
2421     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2422 }
2423
2424 #if CQWRITE
2425 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2426 {
2427     gni_post_descriptor_t *pd;
2428     gni_return_t        status = GNI_RC_SUCCESS;
2429     
2430     MallocPostDesc(pd);
2431     pd->type = GNI_POST_CQWRITE;
2432     pd->cq_mode = GNI_CQMODE_SILENT;
2433     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2434     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2435     pd->cqwrite_value = data;
2436     pd->remote_mem_hndl = mem_hndl;
2437     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2438     GNI_RC_CHECK("GNI_PostCqWrite", status);
2439 }
2440 #endif
2441
2442 // register memory for a message
2443 // return mem handle
2444 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2445 {
2446     gni_return_t status = GNI_RC_SUCCESS;
2447
2448     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2449
2450 #if CMK_PERSISTENT_COMM
2451       // persistent message is always registered
2452       // BIG_MSG small pieces do not have malloc chunk header
2453     if (IS_PERSISTENT_MEMORY(msg)) {
2454         *memh = GetMemHndl(msg);
2455         return GNI_RC_SUCCESS;
2456     }
2457 #endif
2458     if(seqno == 0 
2459 #if CMK_PERSISTENT_COMM
2460          || seqno == PERSIST_SEQ
2461 #endif
2462       )
2463     {
2464         if(IsMemHndlZero((GetMemHndl(msg))))
2465         {
2466             msg = (void*)(msg);
2467             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2468             if(status == GNI_RC_SUCCESS)
2469                 *memh = GetMemHndl(msg);
2470         }
2471         else {
2472             *memh = GetMemHndl(msg);
2473         }
2474     }
2475     else {
2476         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2477         status = registerMemory(msg, size, memh, NULL); 
2478     }
2479     return status;
2480 }
2481
2482 // for BIG_MSG called on receiver side for receiving control message
2483 // LMSG_INIT_TAG
2484 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2485 {
2486 #if     USE_LRTS_MEMPOOL
2487     CONTROL_MSG         *request_msg;
2488     gni_return_t        status = GNI_RC_SUCCESS;
2489     void                *msg_data;
2490     gni_post_descriptor_t *pd;
2491     gni_mem_handle_t    msg_mem_hndl;
2492     int                 size, transaction_size, offset = 0;
2493     size_t              register_size = 0;
2494
2495     // initial a get to transfer data from the sender side */
2496     request_msg = (CONTROL_MSG *) header;
2497     size = request_msg->total_length;
2498     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2499     MallocPostDesc(pd);
2500 #if CMK_WITH_STATS 
2501     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2502 #endif
2503     if(request_msg->seq_id < 2)   {
2504 #if CMK_SMP_TRACE_COMMTHREAD 
2505         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2506 #endif
2507         msg_data = CmiAlloc(size);
2508         CmiSetMsgSeq(msg_data, 0);
2509         _MEMCHECK(msg_data);
2510     }
2511     else {
2512         offset = ONE_SEG*(request_msg->seq_id-1);
2513         msg_data = (char*)request_msg->dest_addr + offset;
2514     }
2515    
2516     pd->cqwrite_value = request_msg->seq_id;
2517
2518     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2519     SetMemHndlZero(pd->local_mem_hndl);
2520     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2521     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2522         if(NoMsgInRecv( (void*)(msg_data)))
2523             register_size = GetMempoolsize((void*)(msg_data));
2524     }
2525
2526     pd->first_operand = ALIGN64(size);                   //  total length
2527
2528     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2529         pd->type            = GNI_POST_FMA_GET;
2530     else
2531         pd->type            = GNI_POST_RDMA_GET;
2532     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2533     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2534     pd->length          = transaction_size;
2535     pd->local_addr      = (uint64_t) msg_data;
2536     pd->remote_addr     = request_msg->source_addr + offset;
2537     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2538 #if MULTI_THREAD_SEND
2539     pd->src_cq_hndl     = rdma_tx_cqh;
2540 #else
2541     pd->src_cq_hndl     = 0; 
2542 #endif
2543     pd->rdma_mode       = 0;
2544     pd->amo_cmd         = 0;
2545 #if CMI_EXERT_RECV_RDMA_CAP
2546     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2547 #endif
2548     //memory registration success
2549     if(status == GNI_RC_SUCCESS )
2550     {
2551         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2552         CMI_GNI_LOCK(lock)
2553 #if REMOTE_EVENT
2554         if( request_msg->seq_id == 0)
2555         {
2556             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2557             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2558             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2559         }
2560 #endif
2561
2562 #if CMK_WITH_STATS
2563         RDMA_TRY_SEND(pd->type)
2564 #endif
2565         if(pd->type == GNI_POST_RDMA_GET) 
2566         {
2567             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2568         }
2569         else
2570         {
2571             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2572         }
2573         CMI_GNI_UNLOCK(lock)
2574
2575         if(status == GNI_RC_SUCCESS )
2576         {
2577 #if CMI_EXERT_RECV_RDMA_CAP
2578             RDMA_pending++;
2579 #endif
2580             if(pd->cqwrite_value == 0)
2581             {
2582 #if MACHINE_DEBUG_LOG
2583                 buffered_recv_msg += register_size;
2584                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2585 #endif
2586                 IncreaseMsgInRecv(msg_data);
2587 #if CMK_SMP_TRACE_COMMTHREAD 
2588                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2589 #endif
2590             }
2591 #if  CMK_WITH_STATS
2592             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2593             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2594 #endif
2595         }
2596     }else
2597     {
2598         SetMemHndlZero((pd->local_mem_hndl));
2599     }
2600     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2601     {
2602 #if REMOTE_EVENT
2603         bufferRdmaMsg(sendRdmaBuf, inst_id, pd, request_msg->ack_index); 
2604 #else
2605         bufferRdmaMsg(sendRdmaBuf, inst_id, pd, -1); 
2606 #endif
2607     }else if (status != GNI_RC_SUCCESS) {
2608         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2609         GNI_RC_CHECK("GetLargeAFter posting", status);
2610     }
2611 #else
2612     CONTROL_MSG         *request_msg;
2613     gni_return_t        status;
2614     void                *msg_data;
2615     gni_post_descriptor_t *pd;
2616     RDMA_REQUEST        *rdma_request_msg;
2617     gni_mem_handle_t    msg_mem_hndl;
2618     //int source;
2619     // initial a get to transfer data from the sender side */
2620     request_msg = (CONTROL_MSG *) header;
2621     msg_data = CmiAlloc(request_msg->length);
2622     _MEMCHECK(msg_data);
2623
2624     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2625
2626     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2627     {
2628         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2629     }
2630
2631     MallocPostDesc(pd);
2632     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2633         pd->type            = GNI_POST_FMA_GET;
2634     else
2635         pd->type            = GNI_POST_RDMA_GET;
2636     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2637     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2638     pd->length          = ALIGN64(request_msg->length);
2639     pd->local_addr      = (uint64_t) msg_data;
2640     pd->remote_addr     = request_msg->source_addr;
2641     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2642 #if MULTI_THREAD_SEND
2643     pd->src_cq_hndl     = rdma_tx_cqh;
2644 #else
2645     pd->src_cq_hndl     = 0;
2646 #endif
2647     pd->rdma_mode       = 0;
2648     pd->amo_cmd         = 0;
2649
2650     //memory registration successful
2651     if(status == GNI_RC_SUCCESS)
2652     {
2653         pd->local_mem_hndl  = msg_mem_hndl;
2654        
2655         if(pd->type == GNI_POST_RDMA_GET) 
2656         {
2657             CMI_GNI_LOCK(rdma_tx_cq_lock)
2658             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2659             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2660         }
2661         else
2662         {
2663             CMI_GNI_LOCK(default_tx_cq_lock)
2664             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2665             CMI_GNI_UNLOCK(default_tx_cq_lock)
2666         }
2667
2668     }else
2669     {
2670         SetMemHndlZero(pd->local_mem_hndl);
2671     }
2672     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2673     {
2674         MallocRdmaRequest(rdma_request_msg);
2675         rdma_request_msg->next = 0;
2676         rdma_request_msg->destNode = inst_id;
2677         rdma_request_msg->pd = pd;
2678         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2679     }else {
2680         GNI_RC_CHECK("AFter posting", status);
2681     }
2682 #endif
2683 }
2684
2685 #if CQWRITE
2686 static void PumpCqWriteTransactions()
2687 {
2688
2689     gni_cq_entry_t          ev;
2690     gni_return_t            status;
2691     void                    *msg;  
2692     int                     msg_size;
2693     while(1) {
2694         //CMI_GNI_LOCK(my_cq_lock) 
2695         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2696         //CMI_GNI_UNLOCK(my_cq_lock)
2697         if(status != GNI_RC_SUCCESS) break;
2698         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2699 #if CMK_PERSISTENT_COMM
2700 #if PRINT_SYH
2701         printf(" %d CQ write event %p\n", myrank, msg);
2702 #endif
2703         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2704 #if PRINT_SYH
2705             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2706 #endif
2707             CmiReference(msg);
2708             msg_size = CmiGetMsgSize(msg);
2709             CMI_CHECK_CHECKSUM(msg, msg_size);
2710             handleOneRecvedMsg(msg_size, msg); 
2711             continue;
2712         }
2713 #endif
2714 #if ! USE_LRTS_MEMPOOL
2715        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2716 #else
2717         DecreaseMsgInSend(msg);
2718 #endif
2719         if(NoMsgInSend(msg))
2720             buffered_send_msg -= GetMempoolsize(msg);
2721         CmiFree(msg);
2722     };
2723     if(status == GNI_RC_ERROR_RESOURCE)
2724     {
2725         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2726     }
2727 }
2728 #endif
2729
2730 #if REMOTE_EVENT
2731 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2732 {
2733     gni_cq_entry_t          ev;
2734     gni_return_t            status;
2735     void                    *msg;   
2736     int                     inst_id, index, type, size;
2737
2738     while(1) {
2739         CMI_GNI_LOCK(global_gni_lock)
2740 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2741         status = GNI_CqGetEvent(rx_cqh, &ev);
2742 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2743         CMI_GNI_UNLOCK(global_gni_lock)
2744
2745         if(status != GNI_RC_SUCCESS) break;
2746
2747         inst_id = GNI_CQ_GET_INST_ID(ev);
2748         index = GET_INDEX(inst_id);
2749         type = GET_TYPE(inst_id);
2750         switch (type) {
2751         case 0:    // ACK
2752             CmiAssert(index>=0 && index<ackPool.size);
2753             CMI_GNI_LOCK(ackPool.lock);
2754             CmiAssert(GetIndexType(ackPool, index) == 1);
2755             msg = GetIndexAddress(ackPool, index);
2756             CMI_GNI_UNLOCK(ackPool.lock);
2757 #if PRINT_SYH
2758             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2759 #endif
2760 #if ! USE_LRTS_MEMPOOL
2761            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2762 #else
2763             DecreaseMsgInSend(msg);
2764 #endif
2765             if(NoMsgInSend(msg))
2766                 buffered_send_msg -= GetMempoolsize(msg);
2767             CmiFree(msg);
2768             IndexPool_freeslot(&ackPool, index);
2769 #if CMI_EXERT_SEND_LARGE_CAP
2770             SEND_large_pending--;
2771 #endif
2772             break;
2773 #if CMK_PERSISTENT_COMM
2774         case 1:  {    // PERSISTENT
2775             CmiLock(persistPool.lock);
2776             CmiAssert(GetIndexType(persistPool, index) == 2);
2777             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2778             CmiUnlock(persistPool.lock);
2779             START_EVENT();
2780             msg = slot->destBuf[0].destAddress;
2781             size = CmiGetMsgSize(msg);
2782             CmiReference(msg);
2783             CMI_CHECK_CHECKSUM(msg, size);
2784             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2785             handleOneRecvedMsg(size, msg); 
2786             break;
2787             }
2788 #endif
2789         default:
2790             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2791             CmiAbort("PumpRemoteTransactions: unknown type");
2792         }
2793     }
2794     if(status == GNI_RC_ERROR_RESOURCE)
2795     {
2796         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2797     }
2798 }
2799 #endif
2800
2801 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2802 {
2803     gni_cq_entry_t          ev;
2804     gni_return_t            status;
2805     uint64_t                type, inst_id;
2806     gni_post_descriptor_t   *tmp_pd;
2807     MSG_LIST                *ptr;
2808     CONTROL_MSG             *ack_msg_tmp;
2809     ACK_MSG                 *ack_msg;
2810     uint8_t                 msg_tag;
2811 #if CMK_DIRECT
2812     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2813 #endif
2814     SMSG_QUEUE         *queue = &smsg_queue;
2815
2816     while(1) {
2817         CMI_GNI_LOCK(my_cq_lock) 
2818         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2819         CMI_GNI_UNLOCK(my_cq_lock)
2820         if(status != GNI_RC_SUCCESS) break;
2821         
2822         type = GNI_CQ_GET_TYPE(ev);
2823         if (type == GNI_CQ_EVENT_TYPE_POST)
2824         {
2825
2826 #if CMI_EXERT_RECV_RDMA_CAP
2827             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2828             RDMA_pending--;
2829 #endif
2830             inst_id     = GNI_CQ_GET_INST_ID(ev);
2831 #if PRINT_SYH
2832             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2833 #endif
2834             CMI_GNI_LOCK(my_cq_lock)
2835             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2836             CMI_GNI_UNLOCK(my_cq_lock)
2837
2838             switch (tmp_pd->type) {
2839 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2840             case GNI_POST_RDMA_PUT:
2841 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2842                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2843 #endif
2844             case GNI_POST_FMA_PUT:
2845                 if(tmp_pd->amo_cmd == 1) {
2846 #if CMK_DIRECT
2847                     //sender ACK to receiver to trigger it is done
2848                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2849                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2850                     msg_tag = DIRECT_PUT_DONE_TAG;
2851 #endif
2852                 }
2853                 else {
2854                     CmiFree((void *)tmp_pd->local_addr);
2855 #if REMOTE_EVENT
2856                     FreePostDesc(tmp_pd);
2857                     continue;
2858 #elif CQWRITE
2859                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2860                     FreePostDesc(tmp_pd);
2861                     continue;
2862 #else
2863                     MallocControlMsg(ack_msg_tmp);
2864                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2865                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2866                     ack_msg_tmp->length  = tmp_pd->length;
2867                     msg_tag = PUT_DONE_TAG;
2868 #endif
2869                 }
2870                 break;
2871 #endif
2872             case GNI_POST_RDMA_GET:
2873             case GNI_POST_FMA_GET:  {
2874 #if  ! USE_LRTS_MEMPOOL
2875                 MallocControlMsg(ack_msg_tmp);
2876                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2877                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2878                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2879                 msg_tag = ACK_TAG;  
2880 #else
2881 #if CMK_WITH_STATS
2882                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2883 #endif
2884                 int seq_id = tmp_pd->cqwrite_value;
2885                 if(seq_id > 0)      // BIG_MSG
2886                 {
2887                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2888                     MallocControlMsg(ack_msg_tmp);
2889                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2890                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2891                     ack_msg_tmp->seq_id = seq_id;
2892                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2893                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2894                     ack_msg_tmp->length = tmp_pd->length;
2895                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2896                     msg_tag = BIG_MSG_TAG; 
2897                 } 
2898                 else
2899                 {
2900                     msg_tag = ACK_TAG; 
2901 #if  !REMOTE_EVENT && !CQWRITE
2902                     MallocAckMsg(ack_msg);
2903                     ack_msg->source_addr = tmp_pd->remote_addr;
2904 #endif
2905                 }
2906 #endif
2907                 break;
2908             }
2909             case  GNI_POST_CQWRITE:
2910                    FreePostDesc(tmp_pd);
2911                    continue;
2912             default:
2913                 CmiPrintf("type=%d\n", tmp_pd->type);
2914                 CmiAbort("PumpLocalTransactions: unknown type!");
2915             }      /* end of switch */
2916
2917 #if CMK_DIRECT
2918             if (tmp_pd->amo_cmd == 1) {
2919                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2920                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2921             }
2922             else
2923 #endif
2924             if (msg_tag == ACK_TAG) {
2925 #if !REMOTE_EVENT
2926 #if   !CQWRITE
2927                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2928                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2929 #else
2930                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2931 #endif
2932 #endif
2933             }
2934             else {
2935                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2936                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2937             }
2938 #if CMK_PERSISTENT_COMM
2939             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2940 #endif
2941             {
2942                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2943 #if PRINT_SYH
2944                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2945 #endif
2946                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2947                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2948
2949                     START_EVENT();
2950                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2951                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2952 #if MACHINE_DEBUG_LOG
2953                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2954                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2955                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2956 #endif
2957                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2958                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2959                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2960                 }else if(msg_tag == BIG_MSG_TAG){
2961                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2962                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2963                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2964                         START_EVENT();
2965 #if PRINT_SYH
2966                         printf("Pipeline msg done [%d]\n", myrank);
2967 #endif
2968 #if     CMK_SMP_TRACE_COMMTHREAD
2969                         if( tmp_pd->cqwrite_value == 1)
2970                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2971 #endif
2972                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2973                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2974                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2975                     }
2976                 }
2977             }
2978             FreePostDesc(tmp_pd);
2979         }
2980     } //end while
2981     if(status == GNI_RC_ERROR_RESOURCE)
2982     {
2983         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2984         GNI_RC_CHECK("Smsg_tx_cq full", status);
2985     }
2986 }
2987
2988 #if CMK_SMP
2989
2990 static void  SendRdmaMsg( PCQueue sendqueue)
2991 {
2992     gni_return_t            status = GNI_RC_SUCCESS;
2993     gni_mem_handle_t        msg_mem_hndl;
2994     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2995     RDMA_REQUEST            *pre = 0;
2996     uint64_t                register_size = 0;
2997     void                    *msg;
2998     int                     i;
2999     int len = PCQueueLength(sendqueue);
3000     for (i=0; i<len; i++)
3001     {
3002 #if CMI_EXERT_RECV_RDMA_CAP
3003         if( RDMA_pending >= RDMA_cap) break;
3004 #endif
3005         CMI_PCQUEUEPOP_LOCK( sendqueue)
3006         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3007         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3008         if (ptr == NULL) break;
3009         
3010         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
3011         gni_post_descriptor_t *pd = ptr->pd;
3012         
3013         msg = (void*)(pd->local_addr);
3014         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3015         register_size = 0;
3016         if(pd->cqwrite_value == 0) {
3017             if(NoMsgInRecv(msg))
3018                 register_size = GetMempoolsize(msg);
3019         }
3020
3021         if(status == GNI_RC_SUCCESS)        //mem register good
3022         {
3023             int destNode = ptr->destNode;
3024             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3025             CMI_GNI_LOCK(lock);
3026 #if REMOTE_EVENT
3027             if( pd->cqwrite_value == 0) {
3028                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3029                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3030                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3031             }
3032 #if CMK_PERSISTENT_COMM
3033             else if (pd->cqwrite_value == PERSIST_SEQ) {
3034                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3035                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3036                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3037             }
3038 #endif
3039 #endif
3040 #if CMK_WITH_STATS
3041             RDMA_TRY_SEND(pd->type)
3042 #endif
3043 #if CMK_SMP_TRACE_COMMTHREAD
3044             if(IS_PUT(pd->type))
3045             {
3046                  START_EVENT();
3047                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3048             }
3049 #endif
3050
3051             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3052             {
3053                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3054             }
3055             else
3056             {
3057                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3058             }
3059             CMI_GNI_UNLOCK(lock);
3060             
3061             if(status == GNI_RC_SUCCESS)    //post good
3062             {
3063 #if CMI_EXERT_RECV_RDMA_CAP
3064                 RDMA_pending ++;
3065 #endif
3066                 if(pd->cqwrite_value == 0)
3067                 {
3068 #if CMK_SMP_TRACE_COMMTHREAD 
3069                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3070 #endif
3071                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3072                 }
3073 #if  CMK_WITH_STATS
3074                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3075                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3076 #endif
3077 #if MACHINE_DEBUG_LOG
3078                 buffered_recv_msg += register_size;
3079                 MACHSTATE(8, "GO request from buffered\n"); 
3080 #endif
3081 #if PRINT_SYH
3082                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3083 #endif
3084             }else           // cannot post
3085             {
3086                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3087 #if PRINT_SYH
3088                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3089 #endif
3090                 break;
3091             }
3092         } else          //memory registration fails
3093         {
3094             PCQueuePush(sendqueue, (char*)ptr);
3095         }
3096     } //end while
3097 }
3098
3099
3100 #else
3101 static void  SendRdmaMsg()
3102 {
3103     gni_return_t            status = GNI_RC_SUCCESS;
3104     gni_mem_handle_t        msg_mem_hndl;
3105     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3106     RDMA_REQUEST            *pre = 0;
3107     uint64_t                register_size = 0;
3108     void                    *msg;
3109     int                     i;
3110     ptr = sendRdmaBuf;
3111     while (ptr!=0 )
3112     {
3113 #if CMI_EXERT_RECV_RDMA_CAP
3114          if( RDMA_pending >= RDMA_cap) break;
3115 #endif
3116         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
3117         gni_post_descriptor_t *pd = ptr->pd;
3118         
3119         msg = (void*)(pd->local_addr);
3120         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3121         register_size = 0;
3122         if(pd->cqwrite_value == 0) {
3123             if(NoMsgInRecv(msg))
3124                 register_size = GetMempoolsize(msg);
3125         }
3126
3127         if(status == GNI_RC_SUCCESS)        //mem register good
3128         {
3129             int destNode = ptr->destNode;
3130             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3131             CMI_GNI_LOCK(lock);
3132 #if REMOTE_EVENT
3133             if( pd->cqwrite_value == 0) {
3134                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3135                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3136                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3137             }
3138 #if CMK_PERSISTENT_COMM
3139             else if (pd->cqwrite_value == PERSIST_SEQ) {
3140                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3141                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3142                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3143             }
3144 #endif
3145 #endif
3146 #if CMK_WITH_STATS
3147             RDMA_TRY_SEND(pd->type)
3148 #endif
3149 #if CMK_SMP_TRACE_COMMTHREAD
3150             if(IS_PUT(pd->type))
3151             {
3152                  START_EVENT();
3153                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3154             }
3155 #endif
3156
3157             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3158             {
3159                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3160             }
3161             else
3162             {
3163                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3164             }
3165             CMI_GNI_UNLOCK(lock);
3166             
3167             if(status == GNI_RC_SUCCESS)    //post good
3168             {
3169 #if CMI_EXERT_RECV_RDMA_CAP
3170                 RDMA_pending ++;
3171 #endif
3172                 tmp_ptr = ptr;
3173                 if(pre != 0) {
3174                     pre->next = ptr->next;
3175                 }
3176                 else {
3177                     sendRdmaBuf = ptr->next;
3178                 }
3179                 ptr = ptr->next;
3180                 FreeRdmaRequest(tmp_ptr);
3181                 if(pd->cqwrite_value == 0)
3182                 {
3183 #if CMK_SMP_TRACE_COMMTHREAD 
3184                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3185 #endif
3186                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3187                 }
3188 #if  CMK_WITH_STATS
3189                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3190                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3191 #endif
3192 #if MACHINE_DEBUG_LOG
3193                 buffered_recv_msg += register_size;
3194                 MACHSTATE(8, "GO request from buffered\n"); 
3195 #endif
3196 #if PRINT_SYH
3197                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
3198 #endif
3199             }else           // cannot post
3200             {
3201                 pre = ptr;
3202                 ptr = ptr->next;
3203 #if PRINT_SYH
3204                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3205 #endif
3206                 break;
3207             }
3208         } else          //memory registration fails
3209         {
3210             pre = ptr;
3211             ptr = ptr->next;
3212         }
3213     } //end while
3214     if(ptr == 0)
3215         sendRdmaTail = pre;
3216 }
3217 #endif
3218
3219 static 
3220 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3221 {
3222     CONTROL_MSG         *control_msg_tmp;
3223     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3224
3225     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3226     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3227             /* connection not exists yet */
3228 #if CMK_SMP
3229             /* non-smp case, connect is issued in send_smsg_message */
3230         if (smsg_connected_flag[ptr->destNode] == 0)
3231             connect_to(ptr->destNode); 
3232 #endif
3233     }
3234     else
3235     switch(ptr->tag)
3236     {
3237     case SMALL_DATA_TAG:
3238         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3239         if(status == GNI_RC_SUCCESS)
3240         {
3241             CmiFree(ptr->msg);
3242         }
3243         break;
3244     case LMSG_INIT_TAG:
3245         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3246         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr);
3247         break;
3248 #if !REMOTE_EVENT && !CQWRITE
3249     case ACK_TAG:
3250         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3251         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3252         break;
3253 #endif
3254     case BIG_MSG_TAG:
3255         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3256         if(status == GNI_RC_SUCCESS)
3257         {
3258             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3259         }
3260         break;
3261 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3262     case PUT_DONE_TAG:
3263         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3264         if(status == GNI_RC_SUCCESS)
3265         {
3266             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3267         }
3268         break;
3269 #endif
3270 #if CMK_DIRECT
3271     case DIRECT_PUT_DONE_TAG:
3272         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3273         if(status == GNI_RC_SUCCESS)
3274         {
3275             free((CMK_DIRECT_HEADER*)ptr->msg);
3276         }
3277         break;
3278 #endif
3279     default:
3280         printf("Weird tag\n");
3281         CmiAbort("should not happen\n");
3282     }       // end switch
3283     return status;
3284 }
3285
3286 // return 1 if all messages are sent
3287 #if CMK_SMP
3288
3289 #if ONE_SEND_QUEUE
3290
3291 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3292 {
3293     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3294     CONTROL_MSG         *control_msg_tmp;
3295     gni_return_t        status;
3296     int                 done = 1;
3297     uint64_t            register_size;
3298     void                *register_addr;
3299     int                 index_previous = -1;
3300 #if     CMI_SENDBUFFERSMSG_CAP
3301     int                 sent_length = 0;
3302 #endif
3303     int          index = 0;
3304     memset(destpe_avail, 0, mysize * sizeof(char));
3305     for (index=0; index<1; index++)
3306     {
3307         int i, len = PCQueueLength(queue->sendMsgBuf);
3308         for (i=0; i<len; i++) 
3309         {
3310             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3311             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3312             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3313             if(ptr == NULL) break;
3314             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3315                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3316                 continue;
3317             }
3318             status = _sendOneBufferedSmsg(queue, ptr);
3319 #if CMI_SENDBUFFERSMSG_CAP
3320             sent_length++;
3321 #endif
3322             if(status == GNI_RC_SUCCESS)
3323             {
3324 #if PRINT_SYH
3325                 buffered_smsg_counter--;
3326                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3327 #endif
3328                 FreeMsgList(ptr);
3329             }else {
3330                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3331                 done = 0;
3332                 if(status == GNI_RC_ERROR_RESOURCE)
3333                 {
3334                     destpe_avail[ptr->destNode] = 1;
3335                 }
3336             } 
3337         } //end while
3338     }   // end pooling for all cores
3339     return done;
3340 }
3341
3342 #else   /*  ! ONE_SEND_QUEUE  */
3343
3344 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3345 {
3346     MSG_LIST            *ptr;
3347     gni_return_t        status;
3348     int                 done = 1;
3349 #if     CMI_SENDBUFFERSMSG_CAP
3350     int                 sent_length = 0;
3351 #endif
3352     int idx;
3353 #if SMP_LOCKS
3354     int          index = -1;
3355     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3356     for(idx =0; idx<nonempty; idx++) 
3357     {
3358         index++;  if (index >= nonempty) index = 0;
3359 #if CMI_SENDBUFFERSMSG_CAP
3360         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3361 #endif
3362         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3363         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3364         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3365         if(current_list == NULL) break; 
3366         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3367             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3368             continue;
3369         }
3370         PCQueue current_queue= current_list->sendSmsgBuf;
3371         CmiLock(current_list->lock);
3372         int i, len = PCQueueLength(current_queue);
3373         current_list->pushed = 0;
3374         CmiUnlock(current_list->lock);
3375 #else      /* ! SMP_LOCKS */
3376     static int          index = -1;
3377     for(idx =0; idx<mysize; idx++) 
3378     {
3379         index++;  if (index == mysize) index = 0;
3380 #if CMI_SENDBUFFERSMSG_CAP
3381         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3382 #endif
3383         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3384         //if (index == myrank) continue;
3385         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3386         int i, len = PCQueueLength(current_queue);
3387 #endif
3388         for (i=0; i<len; i++)  {
3389             CMI_PCQUEUEPOP_LOCK(current_queue)
3390             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3391             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3392             if (ptr == 0) break;
3393
3394             status = _sendOneBufferedSmsg(queue, ptr);
3395 #if CMI_SENDBUFFERSMSG_CAP
3396             sent_length++;
3397 #endif
3398             if(status == GNI_RC_SUCCESS)
3399             {
3400 #if PRINT_SYH
3401                 buffered_smsg_counter--;
3402                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3403 #endif
3404                 FreeMsgList(ptr);
3405             }else {
3406                 PCQueuePush(current_queue, (char*)ptr);
3407                 done = 0;
3408                 if(status == GNI_RC_ERROR_RESOURCE)
3409                 {
3410                     break;
3411                 }
3412             } 
3413         } //end for i
3414 #if SMP_LOCKS
3415         CmiLock(current_list->lock);
3416         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3417         {
3418             current_list->pushed = 1;
3419             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3420         }
3421         CmiUnlock(current_list->lock); 
3422 #endif
3423     }   // end pooling for all cores
3424     return done;
3425 }
3426
3427 #endif
3428
3429 #else              /* non-smp */
3430
3431 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3432 {
3433     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3434     CONTROL_MSG         *control_msg_tmp;
3435     gni_return_t        status;
3436     int                 done = 1;
3437     uint64_t            register_size;
3438     void                *register_addr;
3439     int                 index_previous = -1;
3440 #if     CMI_SENDBUFFERSMSG_CAP
3441     int                 sent_length = 0;
3442 #endif
3443     int index = queue->smsg_head_index;
3444     while(index != -1)
3445     {
3446         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3447         pre = 0;
3448         while(ptr != 0)
3449         {
3450             status = _sendOneBufferedSmsg(queue, ptr);
3451 #if CMI_SENDBUFFERSMSG_CAP
3452             sent_length++;
3453 #endif
3454             if(status == GNI_RC_SUCCESS)
3455             {
3456 #if PRINT_SYH
3457                 buffered_smsg_counter--;
3458                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3459 #endif
3460                 tmp_ptr = ptr;
3461                 if(pre)
3462                 {
3463                     ptr = pre ->next = ptr->next;
3464                 }else
3465                 {
3466                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3467                 }
3468                 FreeMsgList(tmp_ptr);
3469             }else {
3470                 pre = ptr;
3471                 ptr=ptr->next;
3472                 done = 0;
3473                 if(status == GNI_RC_ERROR_RESOURCE)
3474                 {
3475                     break;
3476                 }
3477             } 
3478         } //end while
3479         if(ptr == 0)
3480             queue->smsg_msglist_index[index].tail = pre;
3481         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3482         {
3483             if(index_previous != -1)
3484                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3485             else
3486                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3487         }
3488         else {
3489             index_previous = index;
3490         }
3491         index = queue->smsg_msglist_index[index].next;
3492
3493     }   // end pooling for all cores
3494     return done;
3495 }
3496 #endif
3497
3498 static void ProcessDeadlock();
3499 void LrtsAdvanceCommunication(int whileidle)
3500 {
3501     static int count = 0;
3502     /*  Receive Msg first */
3503 #if CMK_SMP_TRACE_COMMTHREAD
3504     double startT, endT;
3505 #endif
3506     if (useDynamicSMSG && whileidle)
3507     {
3508 #if CMK_SMP_TRACE_COMMTHREAD
3509         startT = CmiWallTimer();
3510 #endif
3511         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3512 #if CMK_SMP_TRACE_COMMTHREAD
3513         endT = CmiWallTimer();
3514         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3515 #endif
3516     }
3517
3518     // Receiving small messages and persistent
3519 #if CMK_SMP_TRACE_COMMTHREAD
3520     startT = CmiWallTimer();
3521 #endif
3522     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3523     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3524 #if CMK_SMP_TRACE_COMMTHREAD
3525     endT = CmiWallTimer();
3526     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3527 #endif
3528
3529 #if !MULTI_THREAD_SEND
3530 #if REMOTE_EVENT
3531     RECV_PERSISTENT
3532 #endif
3533     SEND_PERSISTENT
3534 #endif
3535     //sending small messages
3536     /* Send buffered Message */
3537 #if CMK_SMP_TRACE_COMMTHREAD
3538     startT = CmiWallTimer();
3539 #endif
3540 #if CMK_USE_OOB
3541     SendBufferMsg(&smsg_oob_queue, NULL);
3542     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3543 #else
3544     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3545 #endif
3546 #if CMK_SMP_TRACE_COMMTHREAD
3547     endT = CmiWallTimer();
3548     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3549 #endif
3550
3551 #if REMOTE_EVENT
3552     RECV_PERSISTENT
3553 #endif
3554     SEND_PERSISTENT
3555
3556     //Pump Get messages or PUT messages
3557 #if CMK_SMP_TRACE_COMMTHREAD
3558     startT = CmiWallTimer();
3559 #endif
3560     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3561 #if MULTI_THREAD_SEND
3562     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3563 #endif
3564 #if CMK_SMP_TRACE_COMMTHREAD
3565     endT = CmiWallTimer();
3566     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3567 #endif
3568
3569 #if CMK_USE_OOB
3570     SendBufferMsg(&smsg_oob_queue, NULL);
3571 #endif
3572 #if !MULTI_THREAD_SEND
3573 #if REMOTE_EVENT
3574     RECV_PERSISTENT
3575 #endif
3576     SEND_PERSISTENT
3577 #endif
3578     //Pump Remote event
3579
3580 #if CMK_SMP_TRACE_COMMTHREAD
3581     startT = CmiWallTimer();
3582 #endif
3583
3584 #if CQWRITE
3585     PumpCqWriteTransactions();
3586 #endif
3587
3588 #if REMOTE_EVENT
3589     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3590 #endif
3591
3592 #if CMK_SMP_TRACE_COMMTHREAD
3593     endT = CmiWallTimer();
3594     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3595 #endif
3596  
3597 #if CMK_USE_OOB
3598     SendBufferMsg(&smsg_oob_queue, NULL);
3599 #endif
3600     //Send pending RDMA transactions
3601 #if !MULTI_THREAD_SEND
3602 #if REMOTE_EVENT
3603     RECV_PERSISTENT
3604 #endif
3605     SEND_PERSISTENT
3606 #endif
3607
3608 #if CMK_SMP_TRACE_COMMTHREAD
3609     startT = CmiWallTimer();
3610 #endif
3611 #if CMK_SMP
3612     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3613 #else
3614     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3615 #endif
3616     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3617 #if CMK_SMP_TRACE_COMMTHREAD
3618     endT = CmiWallTimer();
3619     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3620 #endif
3621
3622 #if CMK_USE_OOB
3623     SendBufferMsg(&smsg_oob_queue, NULL);
3624 #endif
3625 #if REMOTE_EVENT
3626     RECV_PERSISTENT
3627 #endif
3628     SEND_PERSISTENT
3629 #if CMK_SMP && ! LARGEPAGE
3630     if (_detected_hang)  ProcessDeadlock();
3631 #endif
3632 }
3633
3634 static void set_smsg_max()
3635 {
3636     char *env;
3637
3638     if(mysize <=512)
3639     {
3640         SMSG_MAX_MSG = 1024;
3641     }else if (mysize <= 4096)
3642     {
3643         SMSG_MAX_MSG = 1024;
3644     }else if (mysize <= 16384)
3645     {
3646         SMSG_MAX_MSG = 512;
3647     }else {
3648         SMSG_MAX_MSG = 256;
3649     }
3650
3651     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3652     if (env) SMSG_MAX_MSG = atoi(env);
3653     CmiAssert(SMSG_MAX_MSG > 0);
3654 }    
3655
3656 /* useDynamicSMSG */
3657 static void _init_dynamic_smsg()
3658 {
3659     gni_return_t status;
3660     uint32_t     vmdh_index = -1;
3661     int i;
3662
3663     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3664     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3665     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3666     for(i=0; i<mysize; i++) {
3667         smsg_connected_flag[i] = 0;
3668         smsg_attr_vector_local[i] = NULL;
3669         smsg_attr_vector_remote[i] = NULL;
3670     }
3671
3672     set_smsg_max();
3673
3674     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3675     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3676     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3677     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3678     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3679
3680     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3681     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3682     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3683     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3684     mailbox_list->offset = 0;
3685     mailbox_list->next = 0;
3686     
3687     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3688         mailbox_list->size, smsg_rx_cqh,
3689         GNI_MEM_READWRITE,   
3690         vmdh_index,
3691         &(mailbox_list->mem_hndl));
3692     GNI_RC_CHECK("MEMORY registration for smsg", status);
3693
3694     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3695     GNI_RC_CHECK("Unbound EP", status);
3696     
3697     alloc_smsg_attr(&send_smsg_attr);
3698
3699     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3700     GNI_RC_CHECK("post unbound datagram", status);
3701
3702       /* always pre-connect to proc 0 */
3703     //if (myrank != 0) connect_to(0);
3704
3705     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3706     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3707 }
3708
3709 static void _init_static_smsg()
3710 {
3711     gni_smsg_attr_t      *smsg_attr;
3712     gni_smsg_attr_t      remote_smsg_attr;
3713     gni_smsg_attr_t      *smsg_attr_vec;
3714     gni_mem_handle_t     my_smsg_mdh_mailbox;
3715     int      ret, i;
3716     gni_return_t status;
3717     uint32_t              vmdh_index = -1;
3718     mdh_addr_t            base_infor;
3719     mdh_addr_t            *base_addr_vec;
3720
3721     set_smsg_max();
3722     
3723     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3724     
3725     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3726     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3727     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3728     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3729     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3730     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3731     CmiAssert(ret == 0);
3732     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3733     
3734     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3735             smsg_memlen*(mysize), smsg_rx_cqh,
3736             GNI_MEM_READWRITE,   
3737             vmdh_index,
3738             &my_smsg_mdh_mailbox);
3739     register_memory_size += smsg_memlen*(mysize);
3740     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3741
3742     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3743     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3744
3745     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3746     base_infor.mdh =  my_smsg_mdh_mailbox;
3747     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3748
3749     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3750  
3751     for(i=0; i<mysize; i++)
3752     {
3753         if(i==myrank)
3754             continue;
3755         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3756         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3757         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3758         smsg_attr[i].mbox_offset = i*smsg_memlen;
3759         smsg_attr[i].buff_size = smsg_memlen;
3760         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3761         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3762     }
3763
3764     for(i=0; i<mysize; i++)
3765     {
3766         if (myrank == i) continue;
3767
3768         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3769         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3770         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3771         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3772         remote_smsg_attr.buff_size = smsg_memlen;
3773         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3774         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3775
3776         /* initialize the smsg channel */
3777         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3778         GNI_RC_CHECK("SMSG Init", status);
3779     } //end initialization
3780
3781     free(base_addr_vec);
3782     free(smsg_attr);
3783
3784     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3785     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3786
3787
3788 inline
3789 static void _init_send_queue(SMSG_QUEUE *queue)
3790 {
3791      int i;
3792 #if ONE_SEND_QUEUE
3793      queue->sendMsgBuf = PCQueueCreate();
3794      destpe_avail = (char*)malloc(mysize * sizeof(char));
3795 #else
3796      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3797 #if CMK_SMP && SMP_LOCKS
3798      queue->nonEmptyQueues = PCQueueCreate();
3799 #endif
3800      for(i =0; i<mysize; i++)
3801      {
3802 #if CMK_SMP
3803          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3804 #if SMP_LOCKS
3805          queue->smsg_msglist_index[i].pushed = 0;
3806          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3807          queue->smsg_msglist_index[i].destpe = i;
3808 #endif
3809 #else
3810          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3811          queue->smsg_msglist_index[i].next = -1;
3812          queue->smsg_head_index = -1;
3813 #endif
3814         
3815      }
3816 #endif
3817 }
3818
3819 inline
3820 static void _init_smsg()
3821 {
3822     if(mysize > 1) {
3823         if (useDynamicSMSG)
3824             _init_dynamic_smsg();
3825         else
3826             _init_static_smsg();
3827     }
3828
3829     _init_send_queue(&smsg_queue);
3830 #if CMK_USE_OOB
3831     _init_send_queue(&smsg_oob_queue);
3832 #endif
3833 }
3834
3835 static void _init_static_msgq()
3836 {
3837     gni_return_t status;
3838     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3839     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3840     msgq_attrs.smsg_q_sz = 1;
3841     msgq_attrs.rcv_pool_sz = 1;
3842     msgq_attrs.num_msgq_eps = 2;
3843     msgq_attrs.nloc_insts = 8;
3844     msgq_attrs.modes = 0;
3845     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3846
3847     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3848     GNI_RC_CHECK("MSGQ Init", status);
3849
3850
3851 }
3852
3853
3854 static CmiUInt8 total_mempool_size = 0;
3855 static CmiUInt8 total_mempool_calls = 0;
3856
3857 #if USE_LRTS_MEMPOOL
3858
3859 #if CMK_PERSISTENT_COMM
3860 void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3861 {
3862     void *pool;
3863     int ret;
3864     gni_return_t status = GNI_RC_SUCCESS;
3865
3866     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3867     if (*size < default_size) *size = default_size;
3868 #if LARGEPAGE
3869     // round up to be multiple of _tlbpagesize
3870     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3871     *size = ALIGNHUGEPAGE(*size);
3872 #endif
3873     total_mempool_size += *size;
3874     total_mempool_calls += 1;
3875 #if   !LARGEPAGE
3876     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3877     {
3878         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3879         CmiAbort("alloc_mempool_block");
3880     }
3881 #endif
3882 #if LARGEPAGE
3883     pool = my_get_huge_pages(*size);
3884     ret = pool==NULL;
3885 #else
3886     ret = posix_memalign(&pool, ALIGNBUF, *size);
3887 #endif
3888     if (ret != 0) {
3889       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3890       if (ret == ENOMEM)
3891         CmiAbort("alloc_mempool_block: out of memory.");
3892       else
3893         CmiAbort("alloc_mempool_block: posix_memalign failed");
3894     }
3895 #if LARGEPAGE
3896     CmiMemLock();
3897     register_count++;
3898     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, persistent_rx_cqh, status);
3899     CmiMemUnlock();
3900     if(status != GNI_RC_SUCCESS) {
3901         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3902 sweep_mempool(CpvAccess(mempool));
3903     }
3904     GNI_RC_CHECK("MEMORY_REGISTER", status);
3905 #else
3906     SetMemHndlZero((*mem_hndl));
3907 #endif
3908     return pool;
3909 }
3910 #endif
3911
3912 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3913 {
3914     void *pool;
3915     int ret;
3916     gni_return_t status = GNI_RC_SUCCESS;
3917
3918     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3919     if (*size < default_size) *size = default_size;
3920 #if LARGEPAGE
3921     // round up to be multiple of _tlbpagesize
3922     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3923     *size = ALIGNHUGEPAGE(*size);
3924 #endif
3925     total_mempool_size += *size;
3926     total_mempool_calls += 1;
3927 #if   !LARGEPAGE
3928     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3929     {
3930         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3931         CmiAbort("alloc_mempool_block");
3932     }
3933 #endif
3934 #if LARGEPAGE
3935     pool = my_get_huge_pages(*size);
3936     ret = pool==NULL;
3937 #else
3938     ret = posix_memalign(&pool, ALIGNBUF, *size);
3939 #endif
3940     if (ret != 0) {
3941       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3942       if (ret == ENOMEM)
3943         CmiAbort("alloc_mempool_block: out of memory.");
3944       else
3945         CmiAbort("alloc_mempool_block: posix_memalign failed");
3946     }
3947 #if LARGEPAGE
3948     CmiMemLock();
3949     register_count++;
3950     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3951     CmiMemUnlock();
3952     if(status != GNI_RC_SUCCESS) {
3953         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3954 sweep_mempool(CpvAccess(mempool));
3955     }
3956     GNI_RC_CHECK("MEMORY_REGISTER", status);
3957 #else
3958     SetMemHndlZero((*mem_hndl));
3959 #endif
3960     return pool;
3961 }
3962
3963 // ptr is a block head pointer
3964 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3965 {
3966     if(!(IsMemHndlZero(mem_hndl)))
3967     {
3968         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3969     }
3970 #if LARGEPAGE
3971     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3972 #else
3973     free(ptr);
3974 #endif
3975 }
3976 #endif
3977
3978 void LrtsPreCommonInit(int everReturn){
3979 #if USE_LRTS_MEMPOOL
3980     CpvInitialize(mempool_type*, mempool);
3981     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3982 #if CMK_PERSISTENT_COMM
3983     CpvInitialize(mempool_type*, persistent_mempool);
3984     CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3985 #endif
3986     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3987 #endif
3988 }
3989
3990 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3991 {
3992     register int            i;
3993     int                     rc;
3994     int                     device_id = 0;
3995     unsigned int            remote_addr;
3996     gni_cdm_handle_t        cdm_hndl;
3997     gni_return_t            status = GNI_RC_SUCCESS;
3998     uint32_t                vmdh_index = -1;
3999     uint8_t                 ptag;
4000     unsigned int            local_addr, *MPID_UGNI_AllAddr;
4001     int                     first_spawned;
4002     int                     physicalID;
4003     char                   *env;
4004
4005     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
4006     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
4007     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
4008    
4009     status = PMI_Init(&first_spawned);
4010     GNI_RC_CHECK("PMI_Init", status);
4011
4012     status = PMI_Get_size(&mysize);
4013     GNI_RC_CHECK("PMI_Getsize", status);
4014
4015     status = PMI_Get_rank(&myrank);
4016     GNI_RC_CHECK("PMI_getrank", status);
4017
4018     //physicalID = CmiPhysicalNodeID(myrank);
4019     
4020     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
4021
4022     *myNodeID = myrank;
4023     *numNodes = mysize;
4024   
4025 #if MULTI_THREAD_SEND
4026     /* Currently, we only consider the case that comm. thread will only recv msgs */
4027     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
4028 #endif
4029
4030 #if CMI_EXERT_SEND_LARGE_CAP
4031     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
4032 #endif
4033
4034 #if CMI_SENDBUFFERSMSG_CAP
4035     CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
4036 #endif
4037
4038 #if CMI_PUMPNETWORKSMSG_CAP 
4039     CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
4040 #endif
4041
4042     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4043     
4044     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
4045     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
4046     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4047
4048     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
4049     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
4050     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
4051
4052     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
4053     if (env) useDynamicSMSG = 1;
4054     if (!useDynamicSMSG)
4055       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
4056     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
4057     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
4058     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
4059     
4060     if(myrank == 0)
4061     {
4062         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
4063         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
4064     }
4065 #ifdef USE_ONESIDED
4066     onesided_init(NULL, &onesided_hnd);
4067
4068     // this is a GNI test, so use the libonesided bypass functionality
4069     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
4070     local_addr = gniGetNicAddress();
4071 #else
4072     ptag = get_ptag();
4073     cookie = get_cookie();
4074 #if 0
4075     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
4076 #endif
4077     //Create and attach to the communication  domain */
4078     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
4079     GNI_RC_CHECK("GNI_CdmCreate", status);
4080     //* device id The device id is the minor number for the device
4081     //that is assigned to the device by the system when the device is created.
4082     //To determine the device number, look in the /dev directory, which contains a list of devices. For