added average statistic and reduce some minor overhead in remote event
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
42
43 #include "converse.h"
44
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
48
49 #define     LARGEPAGE              0
50
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND          0
53 #define COMM_THREAD_SEND           1
54 #endif
55
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK     0
58 #endif
59
60 #define REMOTE_EVENT               1
61 #define CQWRITE                    0
62
63 #define CMI_EXERT_SEND_CAP      0
64 #define CMI_EXERT_RECV_CAP      0
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP  32
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP  4                  /* cap <= 2 sometimes hang */
72 #endif
73 //#define USE_RDMA_CAP   0 
74 int   RDMA_cap =   100;
75 int   RDMA_pending = 0;
76
77 #define USE_LRTS_MEMPOOL                  1
78
79 #define PRINT_SYH                         0
80
81 // Trace communication thread
82 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
83 #define TRACE_THRESHOLD     0.00005
84 #define CMI_MPI_TRACE_MOREDETAILED 0
85 #undef CMI_MPI_TRACE_USEREVENTS
86 #define CMI_MPI_TRACE_USEREVENTS 1
87 #else
88 #undef CMK_SMP_TRACE_COMMTHREAD
89 #define CMK_SMP_TRACE_COMMTHREAD 0
90 #endif
91
92 #define CMK_TRACE_COMMOVERHEAD 0
93 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
94 #undef CMI_MPI_TRACE_USEREVENTS
95 #define CMI_MPI_TRACE_USEREVENTS 1
96 #else
97 #undef CMK_TRACE_COMMOVERHEAD
98 #define CMK_TRACE_COMMOVERHEAD 0
99 #endif
100
101 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
102 CpvStaticDeclare(double, projTraceStart);
103 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
104 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
105 #else
106 #define  START_EVENT()
107 #define  END_EVENT(x)
108 #endif
109
110 #if USE_LRTS_MEMPOOL
111
112 #define oneMB (1024ll*1024)
113 #define oneGB (1024ll*1024*1024)
114
115 static CmiInt8 _mempool_size = 8*oneMB;
116 static CmiInt8 _expand_mem =  4*oneMB;
117 static CmiInt8 _mempool_size_limit = 0;
118
119 static CmiInt8 _totalmem = 0.8*oneGB;
120
121 #if LARGEPAGE
122 static CmiInt8 BIG_MSG  =  16*oneMB;
123 static CmiInt8 ONE_SEG  =  4*oneMB;
124 #else
125 static CmiInt8 BIG_MSG  =  4*oneMB;
126 static CmiInt8 ONE_SEG  =  2*oneMB;
127 #endif
128 #if MULTI_THREAD_SEND
129 static int BIG_MSG_PIPELINE = 1;
130 #else
131 static int BIG_MSG_PIPELINE = 4;
132 #endif
133
134 // dynamic flow control
135 static CmiInt8 buffered_send_msg = 0;
136 static CmiInt8 register_memory_size = 0;
137
138 #if LARGEPAGE
139 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
140 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
141 static CmiInt8  register_count = 0;
142 #else
143 #if CMK_SMP && COMM_THREAD_SEND 
144 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
145 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
146 #else
147 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
148 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
149 #endif
150
151
152 #endif
153
154 #endif     /* end USE_LRTS_MEMPOOL */
155
156 #if MULTI_THREAD_SEND 
157 #define     CMI_GNI_LOCK(x)       CmiLock(x);
158 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
159 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
160 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
161 #else
162 #define     CMI_GNI_LOCK(x)
163 #define     CMI_GNI_UNLOCK(x)
164 #define     CMI_PCQUEUEPOP_LOCK(Q)   
165 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
166 #endif
167
168 static int _tlbpagesize = 4096;
169
170 //static int _smpd_count  = 0;
171
172 static int   user_set_flag  = 0;
173
174 static int _checkProgress = 1;             /* check deadlock */
175 static int _detected_hang = 0;
176
177 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
178
179 // dynamic SMSG
180 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
181
182 static int avg_smsg_connection = 32;
183 static int                 *smsg_connected_flag= 0;
184 static gni_smsg_attr_t     **smsg_attr_vector_local;
185 static gni_smsg_attr_t     **smsg_attr_vector_remote;
186 static gni_ep_handle_t     ep_hndl_unbound;
187 static gni_smsg_attr_t     send_smsg_attr;
188 static gni_smsg_attr_t     recv_smsg_attr;
189
190 typedef struct _dynamic_smsg_mailbox{
191    void     *mailbox_base;
192    int      size;
193    int      offset;
194    gni_mem_handle_t  mem_hndl;
195    struct      _dynamic_smsg_mailbox  *next;
196 }dynamic_smsg_mailbox_t;
197
198 static dynamic_smsg_mailbox_t  *mailbox_list;
199
200 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
201 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
202
203 #if PRINT_SYH
204 int         lrts_send_msg_id = 0;
205 int         lrts_local_done_msg = 0;
206 int         lrts_send_rdma_success = 0;
207 #endif
208
209 #include "machine.h"
210
211 #include "pcqueue.h"
212
213 #include "mempool.h"
214
215 #if CMK_PERSISTENT_COMM
216 #include "machine-persistent.h"
217 #endif
218
219 //#define  USE_ONESIDED 1
220 #ifdef USE_ONESIDED
221 //onesided implementation is wrong, since no place to restore omdh
222 #include "onesided.h"
223 onesided_hnd_t   onesided_hnd;
224 onesided_md_t    omdh;
225 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
226
227 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
228
229 #else
230 uint8_t   onesided_hnd, omdh;
231
232 #if REMOTE_EVENT || CQWRITE 
233 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
234     if(register_memory_size+size>= MAX_REG_MEM) { \
235         status = GNI_RC_ERROR_NOMEM;} \
236     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
237         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
238 #else
239 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
240         if (register_memory_size + size >= MAX_REG_MEM) { \
241             status = GNI_RC_ERROR_NOMEM; \
242         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
243             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
244 #endif
245
246 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
247     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
248              register_memory_size -= size; \
249          else CmiAbort("MEM_DEregister");  \
250     } while (0)
251 #endif
252
253 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
254 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
255 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
256 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
257 #define   IncreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)++
258 #define   DecreaseMsgInRecv(x)    (GetMempoolBlockPtr(x)->msgs_in_recv)--
259 #define   IncreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)++
260 #define   DecreaseMsgInSend(x)    (GetMempoolBlockPtr(x)->msgs_in_send)--
261 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
262 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
263 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
264 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
265 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
266 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
267
268 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
269 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
270
271 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
272 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
273 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
274 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
275
276 #define ALIGNBUF                64
277
278 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
279 /* If SMSG is not used */
280
281 #define FMA_PER_CORE  1024
282 #define FMA_BUFFER_SIZE 1024
283
284 /* If SMSG is used */
285 static int  SMSG_MAX_MSG = 1024;
286 #define SMSG_MAX_CREDIT    72
287
288 #define MSGQ_MAXSIZE       2048
289
290 /* large message transfer with FMA or BTE */
291 #if ! REMOTE_EVENT
292 #define LRTS_GNI_RDMA_THRESHOLD  1024 
293 #else
294    /* remote events only work with RDMA */
295 #define LRTS_GNI_RDMA_THRESHOLD  0 
296 #endif
297
298 #if CMK_SMP
299 static int  REMOTE_QUEUE_ENTRIES=163840; 
300 static int LOCAL_QUEUE_ENTRIES=163840; 
301 #else
302 static int  REMOTE_QUEUE_ENTRIES=20480;
303 static int LOCAL_QUEUE_ENTRIES=20480; 
304 #endif
305
306 #define BIG_MSG_TAG             0x26
307 #define PUT_DONE_TAG            0x28
308 #define DIRECT_PUT_DONE_TAG     0x29
309 #define ACK_TAG                 0x30
310 /* SMSG is data message */
311 #define SMALL_DATA_TAG          0x31
312 /* SMSG is a control message to initialize a BTE */
313 #define LMSG_INIT_TAG           0x39 
314
315 #define DEBUG
316 #ifdef GNI_RC_CHECK
317 #undef GNI_RC_CHECK
318 #endif
319 #ifdef DEBUG
320 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
321 #else
322 #define GNI_RC_CHECK(msg,rc)
323 #endif
324
325 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
326 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
327 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
328
329 static int useStaticMSGQ = 0;
330 static int useStaticFMA = 0;
331 static int mysize, myrank;
332 static gni_nic_handle_t   nic_hndl;
333
334 typedef struct {
335     gni_mem_handle_t mdh;
336     uint64_t addr;
337 } mdh_addr_t ;
338 // this is related to dynamic SMSG
339
340 typedef struct mdh_addr_list{
341     gni_mem_handle_t mdh;
342    void *addr;
343     struct mdh_addr_list *next;
344 }mdh_addr_list_t;
345
346 static unsigned int         smsg_memlen;
347 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
348 mdh_addr_t          setup_mem;
349 mdh_addr_t          *smsg_connection_vec = 0;
350 gni_mem_handle_t    smsg_connection_memhndl;
351 static int          smsg_expand_slots = 10;
352 static int          smsg_available_slot = 0;
353 static void         *smsg_mailbox_mempool = 0;
354 mdh_addr_list_t     *smsg_dynamic_list = 0;
355
356 static void             *smsg_mailbox_base;
357 gni_msgq_attr_t         msgq_attrs;
358 gni_msgq_handle_t       msgq_handle;
359 gni_msgq_ep_attr_t      msgq_ep_attrs;
360 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
361
362 /* =====Beginning of Declarations of Machine Specific Variables===== */
363 static int cookie;
364 static int modes = 0;
365 static gni_cq_handle_t       smsg_rx_cqh = NULL;
366 static gni_cq_handle_t       default_tx_cqh = NULL;
367 static gni_cq_handle_t       rdma_tx_cqh = NULL;
368 static gni_cq_handle_t       rdma_rx_cqh = NULL;
369 static gni_cq_handle_t       post_tx_cqh = NULL;
370 static gni_ep_handle_t       *ep_hndl_array;
371
372 static CmiNodeLock           *ep_lock_array;
373 static CmiNodeLock           default_tx_cq_lock; 
374 static CmiNodeLock           rdma_tx_cq_lock; 
375 static CmiNodeLock           global_gni_lock; 
376 static CmiNodeLock           rx_cq_lock;
377 static CmiNodeLock           smsg_mailbox_lock;
378 static CmiNodeLock           smsg_rx_cq_lock;
379 static CmiNodeLock           *mempool_lock;
380 //#define     CMK_WITH_STATS      1
381 typedef struct msg_list
382 {
383     uint32_t destNode;
384     uint32_t size;
385     void *msg;
386     uint8_t tag;
387 #if !CMK_SMP
388     struct msg_list *next;
389 #endif
390 #if CMK_WITH_STATS
391     double  creation_time;
392 #endif
393 }MSG_LIST;
394
395
396 typedef struct control_msg
397 {
398     uint64_t            source_addr;    /* address from the start of buffer  */
399     uint64_t            dest_addr;      /* address from the start of buffer */
400     int                 total_length;   /* total length */
401     int                 length;         /* length of this packet */
402 #if REMOTE_EVENT
403     int                 ack_index;      /* index from integer to address */
404 #endif
405     uint8_t             seq_id;         //big message   0 meaning single message
406     gni_mem_handle_t    source_mem_hndl;
407     struct control_msg *next;
408 } CONTROL_MSG;
409
410 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
411
412 typedef struct ack_msg
413 {
414     uint64_t            source_addr;    /* address from the start of buffer  */
415 #if ! USE_LRTS_MEMPOOL
416     gni_mem_handle_t    source_mem_hndl;
417     int                 length;          /* total length */
418 #endif
419     struct ack_msg     *next;
420 } ACK_MSG;
421
422 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
423
424 #if CMK_DIRECT
425 typedef struct{
426     uint64_t    handler_addr;
427 }CMK_DIRECT_HEADER;
428
429 typedef struct {
430     char core[CmiMsgHeaderSizeBytes];
431     uint64_t handler;
432 }cmidirectMsg;
433
434 //SYH
435 CpvDeclare(int, CmiHandleDirectIdx);
436 void CmiHandleDirectMsg(cmidirectMsg* msg)
437 {
438
439     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
440    (*(_handle->callbackFnPtr))(_handle->callbackData);
441    CmiFree(msg);
442 }
443
444 void CmiDirectInit()
445 {
446     CpvInitialize(int,  CmiHandleDirectIdx);
447     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
448 }
449
450 #endif
451 typedef struct  rmda_msg
452 {
453     int                   destNode;
454 #if REMOTE_EVENT
455     int                   ack_index;
456 #endif
457     gni_post_descriptor_t *pd;
458 #if !CMK_SMP
459     struct  rmda_msg      *next;
460 #endif
461 }RDMA_REQUEST;
462
463
464 #if CMK_SMP
465 #define SMP_LOCKS               0
466 #define ONE_SEND_QUEUE                  0
467 PCQueue sendRdmaBuf;
468 typedef struct  msg_list_index
469 {
470     PCQueue     sendSmsgBuf;
471     int         pushed;
472     CmiNodeLock   lock;
473 } MSG_LIST_INDEX;
474 char                *destpe_avail;
475 #if  !ONE_SEND_QUEUE && SMP_LOCKS
476     PCQueue     nonEmptyQueues;
477 #endif
478 #else         /* non-smp */
479
480 static RDMA_REQUEST        *sendRdmaBuf = 0;
481 static RDMA_REQUEST        *sendRdmaTail = 0;
482 typedef struct  msg_list_index
483 {
484     int         next;
485     MSG_LIST    *sendSmsgBuf;
486     MSG_LIST    *tail;
487 } MSG_LIST_INDEX;
488
489 #endif
490
491 // buffered send queue
492 #if ! ONE_SEND_QUEUE
493 typedef struct smsg_queue
494 {
495     MSG_LIST_INDEX   *smsg_msglist_index;
496     int               smsg_head_index;
497 } SMSG_QUEUE;
498 #else
499 typedef struct smsg_queue
500 {
501     PCQueue       sendMsgBuf;
502 }  SMSG_QUEUE;
503 #endif
504
505 SMSG_QUEUE                  smsg_queue;
506 #if CMK_USE_OOB
507 SMSG_QUEUE                  smsg_oob_queue;
508 #endif
509
510 #if CMK_SMP
511
512 #define FreeMsgList(d)   free(d);
513 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
514
515 #else
516
517 static MSG_LIST       *msglist_freelist=0;
518
519 #define FreeMsgList(d)  \
520   do { \
521   (d)->next = msglist_freelist;\
522   msglist_freelist = d; \
523   } while (0)
524
525 #define MallocMsgList(d) \
526   do {  \
527   d = msglist_freelist;\
528   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
529              _MEMCHECK(d);\
530   } else msglist_freelist = d->next; \
531   d->next =0;  \
532   } while (0)
533
534 #endif
535
536 #if CMK_SMP
537
538 #define FreeControlMsg(d)      free(d);
539 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
540
541 #else
542
543 static CONTROL_MSG    *control_freelist=0;
544
545 #define FreeControlMsg(d)       \
546   do { \
547   (d)->next = control_freelist;\
548   control_freelist = d; \
549   } while (0);
550
551 #define MallocControlMsg(d) \
552   do {  \
553   d = control_freelist;\
554   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
555              _MEMCHECK(d);\
556   } else control_freelist = d->next;  \
557   } while (0);
558
559 #endif
560
561 #if CMK_SMP
562
563 #define FreeAckMsg(d)      free(d);
564 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
565
566 #else
567
568 static ACK_MSG        *ack_freelist=0;
569
570 #define FreeAckMsg(d)       \
571   do { \
572   (d)->next = ack_freelist;\
573   ack_freelist = d; \
574   } while (0)
575
576 #define MallocAckMsg(d) \
577   do { \
578   d = ack_freelist;\
579   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
580              _MEMCHECK(d);\
581   } else ack_freelist = d->next; \
582   } while (0)
583
584 #endif
585
586
587 # if CMK_SMP
588 #define FreeRdmaRequest(d)       free(d);
589 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
590 #else
591
592 static RDMA_REQUEST         *rdma_freelist = NULL;
593
594 #define FreeRdmaRequest(d)       \
595   do {  \
596   (d)->next = rdma_freelist;\
597   rdma_freelist = d;    \
598   } while (0)
599
600 #define MallocRdmaRequest(d) \
601   do {   \
602   d = rdma_freelist;\
603   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
604              _MEMCHECK(d);\
605   } else rdma_freelist = d->next; \
606   d->next =0;   \
607   } while (0)
608 #endif
609
610 /* reuse gni_post_descriptor_t */
611 static gni_post_descriptor_t *post_freelist=0;
612
613 #if  !CMK_SMP
614 #define FreePostDesc(d)       \
615     (d)->next_descr = post_freelist;\
616     post_freelist = d;
617
618 #define MallocPostDesc(d) \
619   d = post_freelist;\
620   if (d==0) { \
621      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
622      d->next_descr = 0;\
623       _MEMCHECK(d);\
624   } else post_freelist = d->next_descr;
625 #else
626
627 #define FreePostDesc(d)     free(d);
628 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
629
630 #endif
631
632
633 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
634 static int      buffered_smsg_counter = 0;
635
636 /* SmsgSend return success but message sent is not confirmed by remote side */
637 static MSG_LIST *buffered_fma_head = 0;
638 static MSG_LIST *buffered_fma_tail = 0;
639
640 /* functions  */
641 #define IsFree(a,ind)  !( a& (1<<(ind) ))
642 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
643 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
644
645 CpvDeclare(mempool_type*, mempool);
646
647 #if REMOTE_EVENT
648 /* ack pool for remote events */
649
650 #define SHIFT                   18
651 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
652 #define RANK_MASK               ((1<<SHIFT) - 1)
653 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
654
655 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
656 #define GET_RANK(evt)           ((evt) & RANK_MASK)
657 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
658
659 #define PERSIST_EVENT(idx)      (1<<31 | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
660
661 struct IndexStruct {
662 void *addr;
663 int next;
664 int type;     // 1: ACK   2: Persistent
665 };
666
667 typedef struct IndexPool {
668     struct IndexStruct   *indexes;
669     int                   size;
670     int                   freehead;
671     CmiNodeLock           lock;
672 } IndexPool;
673
674 static IndexPool  ackPool;
675 #if CMK_PERSISTENT_COMM
676 static IndexPool  persistPool;
677 #endif
678
679 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
680 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
681
682 static void IndexPool_init(IndexPool *pool)
683 {
684     int i;
685     if ((1<<SHIFT) < mysize) 
686         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
687     pool->size = 1024;
688     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
689     for (i=0; i<pool->size-1; i++) {
690         pool->indexes[i].next = i+1;
691         pool->indexes[i].type = -1;
692     }
693     pool->indexes[i].next = -1;
694     pool->freehead = 0;
695 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
696     pool->lock  = CmiCreateLock();
697 #else
698     pool->lock  = NULL;
699 #endif
700 }
701
702 static
703 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
704 {
705     int i;
706     int s;
707 #if MULTI_THREAD_SEND
708     CmiLock(pool->lock);
709 #endif
710     s = pool->freehead;
711     if (s == -1) {
712         int newsize = pool->size * 2;
713         printf("[%d] IndexPool_getslot expand to: %d\n", myrank, newsize);
714         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
715         struct IndexStruct *old_ackpool = pool->indexes;
716         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
717         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
718         for (i=pool->size; i<newsize-1; i++) {
719             pool->indexes[i].next = i+1;
720             pool->indexes[i].type = -1;
721         }
722         pool->indexes[i].next = -1;
723         pool->freehead = pool->size;
724         s = pool->size;
725         pool->size = newsize;
726         free(old_ackpool);
727     }
728     pool->freehead = pool->indexes[s].next;
729     pool->indexes[s].addr = addr;
730     pool->indexes[s].type = type;
731 #if MULTI_THREAD_SEND
732     CmiUnlock(pool->lock);
733 #endif
734     return s;
735 }
736
737 static
738 inline  void IndexPool_freeslot(IndexPool *pool, int s)
739 {
740     CmiAssert(s>=0 && s<pool->size);
741 #if MULTI_THREAD_SEND
742     CmiLock(pool->lock);
743 #endif
744     pool->indexes[s].next = pool->freehead;
745     pool->indexes[s].type = -1;
746     pool->freehead = s;
747 #if MULTI_THREAD_SEND
748     CmiUnlock(pool->lock);
749 #endif
750 }
751
752
753 #endif
754
755 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
756 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
757 #define CHARM_MAGIC_NUMBER               126
758
759 #if CMK_ERROR_CHECKING
760 extern unsigned char computeCheckSum(unsigned char *data, int len);
761 static int checksum_flag = 0;
762 #define CMI_SET_CHECKSUM(msg, len)      \
763         if (checksum_flag)  {   \
764           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
765           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
766         }
767 #define CMI_CHECK_CHECKSUM(msg, len)    \
768         if (checksum_flag)      \
769           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
770             CmiAbort("Fatal error: checksum doesn't agree!\n");
771 #else
772 #define CMI_SET_CHECKSUM(msg, len)
773 #define CMI_CHECK_CHECKSUM(msg, len)
774 #endif
775 /* =====End of Definitions of Message-Corruption Related Macros=====*/
776
777 static int print_stats = 0;
778 static int stats_off = 0;
779 void CmiTurnOnStats()
780 {
781     stats_off = 0;
782 }
783
784 void CmiTurnOffStats()
785 {
786     stats_off = 1;
787 }
788
789 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
790
791 #if CMK_WITH_STATS
792 FILE *counterLog = NULL;
793 typedef struct comm_thread_stats
794 {
795     uint64_t  smsg_data_count;
796     uint64_t  lmsg_init_count;
797     uint64_t  ack_count;
798     uint64_t  big_msg_ack_count;
799     uint64_t  smsg_count;
800     uint64_t  direct_put_done_count;
801     uint64_t  put_done_count;
802     //times of calling SmsgSend
803     uint64_t  try_smsg_data_count;
804     uint64_t  try_lmsg_init_count;
805     uint64_t  try_ack_count;
806     uint64_t  try_big_msg_ack_count;
807     uint64_t  try_direct_put_done_count;
808     uint64_t  try_put_done_count;
809     uint64_t  try_smsg_count;
810     
811     double    max_time_in_send_buffered_smsg;
812     double    all_time_in_send_buffered_smsg;
813
814     uint64_t  rdma_get_count, rdma_put_count;
815     uint64_t  try_rdma_get_count, try_rdma_put_count;
816     double    max_time_from_control_to_rdma_init;
817     double    all_time_from_control_to_rdma_init;
818
819     double    max_time_from_rdma_init_to_rdma_done;
820     double    all_time_from_rdma_init_to_rdma_done;
821
822     int      count_in_PumpNetwork;
823     double   time_in_PumpNetwork;
824     double   max_time_in_PumpNetwork;
825     int      count_in_SendBufferMsg_smsg;
826     double   time_in_SendBufferMsg_smsg;
827     double   max_time_in_SendBufferMsg_smsg;
828     int      count_in_SendRdmaMsg;
829     double   time_in_SendRdmaMsg;
830     double   max_time_in_SendRdmaMsg;
831     int      count_in_PumpRemoteTransactions;
832     double   time_in_PumpRemoteTransactions;
833     double   max_time_in_PumpRemoteTransactions;
834     int      count_in_PumpLocalTransactions_rdma;
835     double   time_in_PumpLocalTransactions_rdma;
836     double   max_time_in_PumpLocalTransactions_rdma;
837 } Comm_Thread_Stats;
838
839 static Comm_Thread_Stats   comm_stats;
840
841 static void init_comm_stats()
842 {
843   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
844   if (print_stats){
845       char ln[200];
846       int code = mkdir("counters", 00777); 
847       sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
848       counterLog=fopen(ln,"w");
849   }
850 }
851
852 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
853
854 #define SMSG_SENT_DONE(creation_time, tag)  \
855         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
856             else  if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++;  \
857             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
858             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
859             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
860             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
861             comm_stats.smsg_count++; \
862             double inbuff_time = CmiWallTimer() - creation_time;   \
863             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
864             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
865         }
866
867 #define SMSG_TRY_SEND(tag)  \
868         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
869             else  if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
870             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
871             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
872             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
873             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
874             comm_stats.try_smsg_count++; \
875         }
876
877 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
878
879 #define  RDMA_TRANS_DONE(x)      \
880          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
881              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
882              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
883          }
884
885 #define  RDMA_TRANS_INIT(type, x)      \
886          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
887              double rdma_trans_time = CmiWallTimer() - x ; \
888              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
889              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
890          }
891
892 #define STATS_PUMPNETWORK_TIME(x)   \
893         { double t = CmiWallTimer(); \
894           x;        \
895           t = CmiWallTimer() - t;          \
896           comm_stats.count_in_PumpNetwork++;        \
897           comm_stats.time_in_PumpNetwork += t;   \
898           if (t>comm_stats.max_time_in_PumpNetwork)      \
899               comm_stats.max_time_in_PumpNetwork = t;    \
900         }
901
902 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
903         { double t = CmiWallTimer(); \
904           x;        \
905           t = CmiWallTimer() - t;          \
906           comm_stats.count_in_PumpRemoteTransactions ++;        \
907           comm_stats.time_in_PumpRemoteTransactions += t;   \
908           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
909               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
910         }
911
912 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
913         { double t = CmiWallTimer(); \
914           x;        \
915           t = CmiWallTimer() - t;          \
916           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
917           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
918           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
919               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
920         }
921
922 #define STATS_SEND_SMSGS_TIME(x)   \
923         { double t = CmiWallTimer(); \
924           x;        \
925           t = CmiWallTimer() - t;          \
926           comm_stats.count_in_SendBufferMsg_smsg ++;        \
927           comm_stats.time_in_SendBufferMsg_smsg += t;   \
928           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
929               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
930         }
931
932 #define STATS_SENDRDMAMSG_TIME(x)   \
933         { double t = CmiWallTimer(); \
934           x;        \
935           t = CmiWallTimer() - t;          \
936           comm_stats.count_in_SendRdmaMsg ++;        \
937           comm_stats.time_in_SendRdmaMsg += t;   \
938           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
939               comm_stats.max_time_in_SendRdmaMsg = t;    \
940         }
941
942 static void print_comm_stats()
943 {
944     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
945     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
946             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
947             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
948     
949     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
950             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
951             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
952
953     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
954     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
955     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
956
957
958     fprintf(counterLog, "                             count\ttotal_time\tmax\taverage\n", myrank);
959     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.9f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork/comm_stats.count_in_PumpNetwork);
960     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.9f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions/comm_stats.count_in_PumpRemoteTransactions);
961     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.9f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma/comm_stats.count_in_PumpLocalTransactions_rdma);
962     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.9f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg/comm_stats.count_in_SendBufferMsg_smsg);
963     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.9f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg/comm_stats.count_in_SendRdmaMsg);
964 }
965
966 #else
967 #define STATS_PUMPNETWORK_TIME(x)                  x
968 #define STATS_SEND_SMSGS_TIME(x)                   x
969 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
970 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
971 #define STATS_SENDRDMAMSG_TIME(x)                  x
972 #endif
973
974 static void
975 allgather(void *in,void *out, int len)
976 {
977     static int *ivec_ptr=NULL,already_called=0,job_size=0;
978     int i,rc;
979     int my_rank;
980     char *tmp_buf,*out_ptr;
981
982     if(!already_called) {
983
984         rc = PMI_Get_size(&job_size);
985         CmiAssert(rc == PMI_SUCCESS);
986         rc = PMI_Get_rank(&my_rank);
987         CmiAssert(rc == PMI_SUCCESS);
988
989         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
990         CmiAssert(ivec_ptr != NULL);
991
992         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
993         CmiAssert(rc == PMI_SUCCESS);
994
995         already_called = 1;
996
997     }
998
999     tmp_buf = (char *)malloc(job_size * len);
1000     CmiAssert(tmp_buf);
1001
1002     rc = PMI_Allgather(in,tmp_buf,len);
1003     CmiAssert(rc == PMI_SUCCESS);
1004
1005     out_ptr = out;
1006
1007     for(i=0;i<job_size;i++) {
1008
1009         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1010
1011     }
1012
1013     free(tmp_buf);
1014 }
1015
1016 static void
1017 allgather_2(void *in,void *out, int len)
1018 {
1019     //PMI_Allgather is out of order
1020     int i,rc, extend_len;
1021     int  rank_index;
1022     char *out_ptr, *out_ref;
1023     char *in2;
1024
1025     extend_len = sizeof(int) + len;
1026     in2 = (char*)malloc(extend_len);
1027
1028     memcpy(in2, &myrank, sizeof(int));
1029     memcpy(in2+sizeof(int), in, len);
1030
1031     out_ptr = (char*)malloc(mysize*extend_len);
1032
1033     rc = PMI_Allgather(in2, out_ptr, extend_len);
1034     GNI_RC_CHECK("allgather", rc);
1035
1036     out_ref = out;
1037
1038     for(i=0;i<mysize;i++) {
1039         //rank index 
1040         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1041         //copy to the rank index slot
1042         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1043     }
1044
1045     free(out_ptr);
1046     free(in2);
1047
1048 }
1049
1050 static unsigned int get_gni_nic_address(int device_id)
1051 {
1052     unsigned int address, cpu_id;
1053     gni_return_t status;
1054     int i, alps_dev_id=-1,alps_address=-1;
1055     char *token, *p_ptr;
1056
1057     p_ptr = getenv("PMI_GNI_DEV_ID");
1058     if (!p_ptr) {
1059         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1060        
1061         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1062     } else {
1063         while ((token = strtok(p_ptr,":")) != NULL) {
1064             alps_dev_id = atoi(token);
1065             if (alps_dev_id == device_id) {
1066                 break;
1067             }
1068             p_ptr = NULL;
1069         }
1070         CmiAssert(alps_dev_id != -1);
1071         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1072         CmiAssert(p_ptr != NULL);
1073         i = 0;
1074         while ((token = strtok(p_ptr,":")) != NULL) {
1075             if (i == alps_dev_id) {
1076                 alps_address = atoi(token);
1077                 break;
1078             }
1079             p_ptr = NULL;
1080             ++i;
1081         }
1082         CmiAssert(alps_address != -1);
1083         address = alps_address;
1084     }
1085     return address;
1086 }
1087
1088 static uint8_t get_ptag(void)
1089 {
1090     char *p_ptr, *token;
1091     uint8_t ptag;
1092
1093     p_ptr = getenv("PMI_GNI_PTAG");
1094     CmiAssert(p_ptr != NULL);
1095     token = strtok(p_ptr, ":");
1096     ptag = (uint8_t)atoi(token);
1097     return ptag;
1098         
1099 }
1100
1101 static uint32_t get_cookie(void)
1102 {
1103     uint32_t cookie;
1104     char *p_ptr, *token;
1105
1106     p_ptr = getenv("PMI_GNI_COOKIE");
1107     CmiAssert(p_ptr != NULL);
1108     token = strtok(p_ptr, ":");
1109     cookie = (uint32_t)atoi(token);
1110
1111     return cookie;
1112 }
1113
1114 #if LARGEPAGE
1115
1116 /* directly mmap memory from hugetlbfs for large pages */
1117
1118 #include <sys/stat.h>
1119 #include <fcntl.h>
1120 #include <sys/mman.h>
1121 #include <hugetlbfs.h>
1122
1123 // size must be _tlbpagesize aligned
1124 void *my_get_huge_pages(size_t size)
1125 {
1126     char filename[512];
1127     int fd;
1128     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1129     void *ptr = NULL;
1130
1131     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1132     fd = open(filename, O_RDWR | O_CREAT, mode);
1133     if (fd == -1) {
1134         CmiAbort("my_get_huge_pages: open filed");
1135     }
1136     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1137     if (ptr == MAP_FAILED) ptr = NULL;
1138 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1139     close(fd);
1140     unlink(filename);
1141     return ptr;
1142 }
1143
1144 void my_free_huge_pages(void *ptr, int size)
1145 {
1146 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1147     int ret = munmap(ptr, size);
1148     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1149 }
1150
1151 #endif
1152
1153 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1154 /* TODO: add any that are related */
1155 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1156
1157
1158 #include "machine-lrts.h"
1159 #include "machine-common-core.c"
1160
1161 /* Network progress function is used to poll the network when for
1162    messages. This flushes receive buffers on some  implementations*/
1163 #if CMK_MACHINE_PROGRESS_DEFINED
1164 void CmiMachineProgressImpl() {
1165 }
1166 #endif
1167
1168 static int SendBufferMsg(SMSG_QUEUE *queue);
1169 static void SendRdmaMsg();
1170 static void PumpNetworkSmsg();
1171 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1172 #if CQWRITE
1173 static void PumpCqWriteTransactions();
1174 #endif
1175 #if REMOTE_EVENT
1176 static void PumpRemoteTransactions();
1177 #endif
1178
1179 #if MACHINE_DEBUG_LOG
1180 FILE *debugLog = NULL;
1181 static CmiInt8 buffered_recv_msg = 0;
1182 int         lrts_smsg_success = 0;
1183 int         lrts_received_msg = 0;
1184 #endif
1185
1186 static void sweep_mempool(mempool_type *mptr)
1187 {
1188     int n = 0;
1189     block_header *current = &(mptr->block_head);
1190
1191     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1192     while( current!= NULL) {
1193         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1194         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1195     }
1196     printf("[n %d] sweep_mempool slot END.\n", myrank);
1197 }
1198
1199 inline
1200 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1201 {
1202     block_header *current = *from;
1203
1204     //while(register_memory_size>= MAX_REG_MEM)
1205     //{
1206         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1207             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1208
1209         *from = current;
1210         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1211         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1212         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1213     //}
1214     return GNI_RC_SUCCESS;
1215 }
1216
1217 inline 
1218 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1219 {
1220     gni_return_t status = GNI_RC_SUCCESS;
1221     //int size = GetMempoolsize(msg);
1222     //void *blockaddr = GetMempoolBlockPtr(msg);
1223     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1224    
1225     block_header *current = &(mptr->block_head);
1226     while(register_memory_size>= MAX_REG_MEM)
1227     {
1228         status = deregisterMemory(mptr, &current);
1229         if (status != GNI_RC_SUCCESS) break;
1230     }
1231     if(register_memory_size>= MAX_REG_MEM) return status;
1232
1233     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1234     while(1)
1235     {
1236         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1237         if(status == GNI_RC_SUCCESS)
1238         {
1239             break;
1240         }
1241         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1242         {
1243             CmiAbort("Memory registor for mempool fails\n");
1244         }
1245         else
1246         {
1247             status = deregisterMemory(mptr, &current);
1248             if (status != GNI_RC_SUCCESS) break;
1249         }
1250     }; 
1251     return status;
1252 }
1253
1254 inline 
1255 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1256 {
1257     static int rank = -1;
1258     int i;
1259     gni_return_t status;
1260     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1261     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1262     mempool_type *mptr;
1263
1264     status = registerFromMempool(mptr1, msg, size, t, cqh);
1265     if (status == GNI_RC_SUCCESS) return status;
1266 #if CMK_SMP 
1267     for (i=0; i<CmiMyNodeSize()+1; i++) {
1268       rank = (rank+1)%(CmiMyNodeSize()+1);
1269       mptr = CpvAccessOther(mempool, rank);
1270       if (mptr == mptr1) continue;
1271       status = registerFromMempool(mptr, msg, size, t, cqh);
1272       if (status == GNI_RC_SUCCESS) return status;
1273     }
1274 #endif
1275     return  GNI_RC_ERROR_RESOURCE;
1276 }
1277
1278 inline
1279 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1280 {
1281     MSG_LIST        *msg_tmp;
1282     MallocMsgList(msg_tmp);
1283     msg_tmp->destNode = destNode;
1284     msg_tmp->size   = size;
1285     msg_tmp->msg    = msg;
1286     msg_tmp->tag    = tag;
1287 #if CMK_WITH_STATS
1288     SMSG_CREATION(msg_tmp)
1289 #endif
1290 #if !CMK_SMP
1291     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1292         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1293         queue->smsg_head_index = destNode;
1294         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1295     }else
1296     {
1297         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1298         queue->smsg_msglist_index[destNode].tail = msg_tmp;
1299     }
1300 #else
1301 #if ONE_SEND_QUEUE
1302     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1303 #else
1304 #if SMP_LOCKS
1305     CmiLock(queue->smsg_msglist_index[destNode].lock);
1306     if(queue->smsg_msglist_index[destNode].pushed == 0)
1307     {
1308         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1309     }
1310     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1311     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1312 #else
1313     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1314 #endif
1315 #endif
1316 #endif
1317 #if PRINT_SYH
1318     buffered_smsg_counter++;
1319 #endif
1320 }
1321
1322 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1323 {
1324     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1325 }
1326
1327 inline
1328 static void setup_smsg_connection(int destNode)
1329 {
1330     mdh_addr_list_t  *new_entry = 0;
1331     gni_post_descriptor_t *pd;
1332     gni_smsg_attr_t      *smsg_attr;
1333     gni_return_t status = GNI_RC_NOT_DONE;
1334     RDMA_REQUEST        *rdma_request_msg;
1335     
1336     if(smsg_available_slot == smsg_expand_slots)
1337     {
1338         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1339         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1340         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1341
1342         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1343             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1344             GNI_MEM_READWRITE,   
1345             -1,
1346             &(new_entry->mdh));
1347         smsg_available_slot = 0; 
1348         new_entry->next = smsg_dynamic_list;
1349         smsg_dynamic_list = new_entry;
1350     }
1351     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1352     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1353     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1354     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1355     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1356     smsg_attr->buff_size = smsg_memlen;
1357     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1358     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1359     smsg_local_attr_vec[destNode] = smsg_attr;
1360     smsg_available_slot++;
1361     MallocPostDesc(pd);
1362     pd->type            = GNI_POST_FMA_PUT;
1363     //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1364     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1365     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1366     pd->length          = sizeof(gni_smsg_attr_t);
1367     pd->local_addr      = (uint64_t) smsg_attr;
1368     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1369     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1370     pd->src_cq_hndl     = rdma_tx_cqh;
1371     pd->rdma_mode       = 0;
1372     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1373     print_smsg_attr(smsg_attr);
1374     if(status == GNI_RC_ERROR_RESOURCE )
1375     {
1376         MallocRdmaRequest(rdma_request_msg);
1377         rdma_request_msg->destNode = destNode;
1378         rdma_request_msg->pd = pd;
1379         /* buffer this request */
1380     }
1381 #if PRINT_SYH
1382     if(status != GNI_RC_SUCCESS)
1383        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1384     else
1385         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1386 #endif
1387 }
1388
1389 /* useDynamicSMSG */
1390 inline 
1391 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1392 {
1393     gni_return_t status = GNI_RC_NOT_DONE;
1394
1395     if(mailbox_list->offset == mailbox_list->size)
1396     {
1397         dynamic_smsg_mailbox_t *new_mailbox_entry;
1398         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1399         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1400         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1401         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1402         new_mailbox_entry->offset = 0;
1403         
1404         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1405             new_mailbox_entry->size, smsg_rx_cqh,
1406             GNI_MEM_READWRITE,   
1407             -1,
1408             &(new_mailbox_entry->mem_hndl));
1409
1410         GNI_RC_CHECK("register", status);
1411         new_mailbox_entry->next = mailbox_list;
1412         mailbox_list = new_mailbox_entry;
1413     }
1414     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1415     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1416     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1417     local_smsg_attr->mbox_offset = mailbox_list->offset;
1418     mailbox_list->offset += smsg_memlen;
1419     local_smsg_attr->buff_size = smsg_memlen;
1420     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1421     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1422 }
1423
1424 /* useDynamicSMSG */
1425 inline 
1426 static int connect_to(int destNode)
1427 {
1428     gni_return_t status = GNI_RC_NOT_DONE;
1429     CmiAssert(smsg_connected_flag[destNode] == 0);
1430     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1431     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1432     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1433     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1434     
1435     CMI_GNI_LOCK(global_gni_lock)
1436     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1437     CMI_GNI_UNLOCK(global_gni_lock)
1438     if (status == GNI_RC_ERROR_RESOURCE) {
1439       /* possibly destNode is making connection at the same time */
1440       free(smsg_attr_vector_local[destNode]);
1441       smsg_attr_vector_local[destNode] = NULL;
1442       free(smsg_attr_vector_remote[destNode]);
1443       smsg_attr_vector_remote[destNode] = NULL;
1444       mailbox_list->offset -= smsg_memlen;
1445       return 0;
1446     }
1447     GNI_RC_CHECK("GNI_Post", status);
1448     smsg_connected_flag[destNode] = 1;
1449     return 1;
1450 }
1451
1452 inline 
1453 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1454 {
1455     unsigned int          remote_address;
1456     uint32_t              remote_id;
1457     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1458     gni_smsg_attr_t       *smsg_attr;
1459     gni_post_descriptor_t *pd;
1460     gni_post_state_t      post_state;
1461     char                  *real_data; 
1462
1463     if (useDynamicSMSG) {
1464         switch (smsg_connected_flag[destNode]) {
1465         case 0: 
1466             connect_to(destNode);         /* continue to case 1 */
1467         case 1:                           /* pending connection, do nothing */
1468             status = GNI_RC_NOT_DONE;
1469             if(inbuff ==0)
1470                 buffer_small_msgs(queue, msg, size, destNode, tag);
1471             return status;
1472         }
1473     }
1474 #if CMK_SMP
1475 #if ! ONE_SEND_QUEUE
1476     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1477 #endif
1478     {
1479 #else
1480     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1481     {
1482 #endif
1483         //CMI_GNI_LOCK(smsg_mailbox_lock)
1484         CMI_GNI_LOCK(default_tx_cq_lock)
1485 #if CMK_SMP_TRACE_COMMTHREAD
1486         int oldpe = -1;
1487         int oldeventid = -1;
1488         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1489         { 
1490             START_EVENT();
1491             if ( tag == SMALL_DATA_TAG)
1492                 real_data = (char*)msg; 
1493             else 
1494                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1495             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1496             TRACE_COMM_SET_COMM_MSGID(real_data);
1497         }
1498 #endif
1499 #if REMOTE_EVENT
1500         if (tag == LMSG_INIT_TAG) {
1501             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1502             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1503                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1504         }
1505         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1506 #endif
1507 #if     CMK_WITH_STATS
1508         SMSG_TRY_SEND(tag)
1509 #endif
1510 #if CMK_WITH_STATS
1511     double              creation_time;
1512     if (ptr == NULL)
1513         creation_time = CmiWallTimer();
1514     else
1515         creation_time = ptr->creation_time;
1516 #endif
1517
1518     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1519 #if CMK_SMP_TRACE_COMMTHREAD
1520         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1521 #endif
1522         CMI_GNI_UNLOCK(default_tx_cq_lock)
1523         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1524         if(status == GNI_RC_SUCCESS)
1525         {
1526 #if     CMK_WITH_STATS
1527             SMSG_SENT_DONE(creation_time,tag) 
1528 #endif
1529 #if CMK_SMP_TRACE_COMMTHREAD
1530             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1531             { 
1532                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1533             }
1534 #endif
1535         }else
1536             status = GNI_RC_ERROR_RESOURCE;
1537     }
1538     if(status != GNI_RC_SUCCESS && inbuff ==0)
1539         buffer_small_msgs(queue, msg, size, destNode, tag);
1540     return status;
1541 }
1542
1543 inline 
1544 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1545 {
1546     /* construct a control message and send */
1547     CONTROL_MSG         *control_msg_tmp;
1548     MallocControlMsg(control_msg_tmp);
1549     control_msg_tmp->source_addr = (uint64_t)msg;
1550     control_msg_tmp->seq_id    = seqno;
1551     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1552 #if REMOTE_EVENT
1553     control_msg_tmp->ack_index    =  -1;
1554 #endif
1555 #if     USE_LRTS_MEMPOOL
1556     if(size < BIG_MSG)
1557     {
1558         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1559     }
1560     else
1561     {
1562         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1563         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1564         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1565     }
1566 #else
1567     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1568 #endif
1569     return control_msg_tmp;
1570 }
1571
1572 #define BLOCKING_SEND_CONTROL    0
1573
1574 // Large message, send control to receiver, receiver register memory and do a GET, 
1575 // return 1 - send no success
1576 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1577 {
1578     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1579     uint32_t            vmdh_index  = -1;
1580     int                 size;
1581     int                 offset = 0;
1582     uint64_t            source_addr;
1583     int                 register_size; 
1584     void                *msg;
1585
1586     size    =   control_msg_tmp->total_length;
1587     source_addr = control_msg_tmp->source_addr;
1588     register_size = control_msg_tmp->length;
1589
1590 #if  USE_LRTS_MEMPOOL
1591     if( control_msg_tmp->seq_id == 0 ){
1592 #if BLOCKING_SEND_CONTROL
1593         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1594             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1595                 LrtsAdvanceCommunication(0);
1596         }
1597 #endif
1598         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1599         {
1600             msg = (void*)source_addr;
1601             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1602             {
1603                 if(!inbuff)
1604                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1605                 return GNI_RC_ERROR_NOMEM;
1606             }
1607             //register the corresponding mempool
1608             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1609             if(status == GNI_RC_SUCCESS)
1610             {
1611                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1612             }
1613         }else
1614         {
1615             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1616             status = GNI_RC_SUCCESS;
1617         }
1618         if(NoMsgInSend(source_addr))
1619             register_size = GetMempoolsize((void*)(source_addr));
1620         else
1621             register_size = 0;
1622     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1623     {
1624         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1625         source_addr += offset;
1626         size = control_msg_tmp->length;
1627 #if BLOCKING_SEND_CONTROL
1628         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1629             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1630                 LrtsAdvanceCommunication(0);
1631         }
1632 #endif
1633         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1634             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1635             {
1636                 if(!inbuff)
1637                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1638                 return GNI_RC_ERROR_NOMEM;
1639             }
1640             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1641             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1642         }
1643         else
1644         {
1645             status = GNI_RC_SUCCESS;
1646         }
1647         register_size = 0;  
1648     }
1649
1650     if(status == GNI_RC_SUCCESS)
1651     {
1652         status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
1653         if(status == GNI_RC_SUCCESS)
1654         {
1655             buffered_send_msg += register_size;
1656             if(control_msg_tmp->seq_id == 0)
1657             {
1658                 IncreaseMsgInSend(source_addr);
1659             }
1660             FreeControlMsg(control_msg_tmp);
1661             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1662         }else
1663             status = GNI_RC_ERROR_RESOURCE;
1664
1665     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1666     {
1667         CmiAbort("Memory registor for large msg\n");
1668     }else 
1669     {
1670         status = GNI_RC_ERROR_NOMEM; 
1671         if(!inbuff)
1672             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1673     }
1674     return status;
1675 #else
1676     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1677     if(status == GNI_RC_SUCCESS)
1678     {
1679         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);  
1680         if(status == GNI_RC_SUCCESS)
1681         {
1682             FreeControlMsg(control_msg_tmp);
1683         }
1684     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1685     {
1686         CmiAbort("Memory registor for large msg\n");
1687     }else 
1688     {
1689         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1690     }
1691     return status;
1692 #endif
1693 }
1694
1695 inline void LrtsPrepareEnvelope(char *msg, int size)
1696 {
1697     CmiSetMsgSize(msg, size);
1698     CMI_SET_CHECKSUM(msg, size);
1699 }
1700
1701 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1702 {
1703     gni_return_t        status  =   GNI_RC_SUCCESS;
1704     uint8_t tag;
1705     CONTROL_MSG         *control_msg_tmp;
1706     int                 oob = ( mode & OUT_OF_BAND);
1707     SMSG_QUEUE          *queue;
1708
1709     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1710 #if CMK_USE_OOB
1711     queue = oob? &smsg_oob_queue : &smsg_queue;
1712 #else
1713     queue = &smsg_queue;
1714 #endif
1715
1716     LrtsPrepareEnvelope(msg, size);
1717
1718 #if PRINT_SYH
1719     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1720 #endif 
1721 #if CMK_SMP 
1722     if(size <= SMSG_MAX_MSG)
1723         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1724     else if (size < BIG_MSG) {
1725         control_msg_tmp =  construct_control_msg(size, msg, 0);
1726         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1727     }
1728     else {
1729           CmiSetMsgSeq(msg, 0);
1730           control_msg_tmp =  construct_control_msg(size, msg, 1);
1731           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1732     }
1733 #else   //non-smp, smp(worker sending)
1734     if(size <= SMSG_MAX_MSG)
1735     {
1736         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1737             CmiFree(msg);
1738     }
1739     else if (size < BIG_MSG) {
1740         control_msg_tmp =  construct_control_msg(size, msg, 0);
1741         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1742     }
1743     else {
1744 #if     USE_LRTS_MEMPOOL
1745         CmiSetMsgSeq(msg, 0);
1746         control_msg_tmp =  construct_control_msg(size, msg, 1);
1747         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1748 #else
1749         control_msg_tmp =  construct_control_msg(size, msg, 0);
1750         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1751 #endif
1752     }
1753 #endif
1754     return 0;
1755 }
1756
1757 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1758 {
1759   int i;
1760 #if CMK_BROADCAST_USE_CMIREFERENCE
1761   for(i=0;i<npes;i++) {
1762     if (pes[i] == CmiMyPe())
1763       CmiSyncSend(pes[i], len, msg);
1764     else {
1765       CmiReference(msg);
1766       CmiSyncSendAndFree(pes[i], len, msg);
1767     }
1768   }
1769 #else
1770   for(i=0;i<npes;i++) {
1771     CmiSyncSend(pes[i], len, msg);
1772   }
1773 #endif
1774 }
1775
1776 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1777 {
1778   /* A better asynchronous implementation may be wanted, but at least it works */
1779   CmiSyncListSendFn(npes, pes, len, msg);
1780   return (CmiCommHandle) 0;
1781 }
1782
1783 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1784 {
1785   if (npes == 1) {
1786       CmiSyncSendAndFree(pes[0], len, msg);
1787       return;
1788   }
1789 #if CMK_PERSISTENT_COMM
1790   if (CpvAccess(phs) && len > 1024) {
1791       int i;
1792       for(i=0;i<npes;i++) {
1793         if (pes[i] == CmiMyPe())
1794           CmiSyncSend(pes[i], len, msg);
1795         else {
1796           CmiReference(msg);
1797           CmiSyncSendAndFree(pes[i], len, msg);
1798         }
1799       }
1800       CmiFree(msg);
1801       return;
1802   }
1803 #endif
1804   
1805 #if CMK_BROADCAST_USE_CMIREFERENCE
1806   if (npes == 1) {
1807     CmiSyncSendAndFree(pes[0], len, msg);
1808     return;
1809   }
1810   CmiSyncListSendFn(npes, pes, len, msg);
1811   CmiFree(msg);
1812 #else
1813   int i;
1814   for(i=0;i<npes-1;i++) {
1815     CmiSyncSend(pes[i], len, msg);
1816   }
1817   if (npes>0)
1818     CmiSyncSendAndFree(pes[npes-1], len, msg);
1819   else 
1820     CmiFree(msg);
1821 #endif
1822 }
1823
1824 static void    PumpDatagramConnection();
1825 static      int         event_SetupConnect = 111;
1826 static      int         event_PumpSmsg = 222 ;
1827 static      int         event_PumpTransaction = 333;
1828 static      int         event_PumpRdmaTransaction = 444;
1829 static      int         event_SendBufferSmsg = 444;
1830 static      int         event_SendFmaRdmaMsg = 555;
1831 static      int         event_AdvanceCommunication = 666;
1832
1833 static void registerUserTraceEvents() {
1834 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1835     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1836     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1837     event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1838     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1839     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1840     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1841     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1842 #endif
1843 }
1844
1845 static void ProcessDeadlock()
1846 {
1847     static CmiUInt8 *ptr = NULL;
1848     static CmiUInt8  last = 0, mysum, sum;
1849     static int count = 0;
1850     gni_return_t status;
1851     int i;
1852
1853 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1854 //sweep_mempool(CpvAccess(mempool));
1855     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1856     mysum = smsg_send_count + smsg_recv_count;
1857     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1858     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1859     GNI_RC_CHECK("PMI_Allgather", status);
1860     sum = 0;
1861     for (i=0; i<mysize; i++)  sum+= ptr[i];
1862     if (last == 0 || sum == last) 
1863         count++;
1864     else
1865         count = 0;
1866     last = sum;
1867     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1868     if (count == 2) { 
1869         /* detected twice, it is a real deadlock */
1870         if (myrank == 0)  {
1871             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1872             CmiAbort("Fatal> Deadlock detected.");
1873         }
1874
1875     }
1876     _detected_hang = 0;
1877 }
1878
1879 static void CheckProgress()
1880 {
1881     if (smsg_send_count == last_smsg_send_count &&
1882         smsg_recv_count == last_smsg_recv_count ) 
1883     {
1884         _detected_hang = 1;
1885 #if !CMK_SMP
1886         if (_detected_hang) ProcessDeadlock();
1887 #endif
1888
1889     }
1890     else {
1891         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1892         last_smsg_send_count = smsg_send_count;
1893         last_smsg_recv_count = smsg_recv_count;
1894         _detected_hang = 0;
1895     }
1896 }
1897
1898 static void set_limit()
1899 {
1900     //if (!user_set_flag && CmiMyRank() == 0) {
1901     if (CmiMyRank() == 0) {
1902         int mynode = CmiPhysicalNodeID(CmiMyPe());
1903         int numpes = CmiNumPesOnPhysicalNode(mynode);
1904         int numprocesses = numpes / CmiMyNodeSize();
1905         MAX_REG_MEM  = _totalmem / numprocesses;
1906         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1907         if (CmiMyPe() == 0)
1908            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1909         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1910         {
1911              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1912              CmiAbort("memory registration\n");
1913         }
1914     }
1915 }
1916
1917 void LrtsPostCommonInit(int everReturn)
1918 {
1919 #if CMK_DIRECT
1920     CmiDirectInit();
1921 #endif
1922 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1923     CpvInitialize(double, projTraceStart);
1924     /* only PE 0 needs to care about registration (to generate sts file). */
1925     //if (CmiMyPe() == 0) 
1926     {
1927         registerMachineUserEventsFunction(&registerUserTraceEvents);
1928     }
1929 #endif
1930
1931 #if CMK_SMP
1932     CmiIdleState *s=CmiNotifyGetState();
1933     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1934     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1935 #else
1936     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1937     if (useDynamicSMSG)
1938     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1939 #endif
1940
1941 #if ! LARGEPAGE
1942     if (_checkProgress)
1943 #if CMK_SMP
1944     if (CmiMyRank() == 0)
1945 #endif
1946     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1947 #endif
1948  
1949 #if !LARGEPAGE
1950     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1951 #endif
1952 }
1953
1954 /* this is called by worker thread */
1955 void LrtsPostNonLocal(){
1956 #if CMK_SMP_TRACE_COMMTHREAD
1957     double startT, endT;
1958 #endif
1959 #if MULTI_THREAD_SEND
1960     if(mysize == 1) return;
1961 #if CMK_SMP_TRACE_COMMTHREAD
1962     traceEndIdle();
1963 #endif
1964
1965 #if CMK_SMP_TRACE_COMMTHREAD
1966     startT = CmiWallTimer();
1967 #endif
1968
1969 #if CMK_WORKER_SINGLE_TASK
1970     if (CmiMyRank() % 6 == 0)
1971 #endif
1972     PumpNetworkSmsg();
1973
1974 #if CMK_WORKER_SINGLE_TASK
1975     if (CmiMyRank() % 6 == 1)
1976 #endif
1977     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1978
1979 #if CMK_WORKER_SINGLE_TASK
1980     if (CmiMyRank() % 6 == 2)
1981 #endif
1982     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1983
1984 #if REMOTE_EVENT
1985 #if CMK_WORKER_SINGLE_TASK
1986     if (CmiMyRank() % 6 == 3)
1987 #endif
1988     PumpRemoteTransactions();
1989 #endif
1990
1991 #if CMK_USE_OOB
1992     if (SendBufferMsg(&smsg_oob_queue) == 1)
1993 #endif
1994     {
1995 #if CMK_WORKER_SINGLE_TASK
1996     if (CmiMyRank() % 6 == 4)
1997 #endif
1998         SendBufferMsg(&smsg_queue);
1999     }
2000
2001 #if CMK_WORKER_SINGLE_TASK
2002     if (CmiMyRank() % 6 == 5)
2003 #endif
2004     SendRdmaMsg();
2005
2006 #if CMK_SMP_TRACE_COMMTHREAD
2007     endT = CmiWallTimer();
2008     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2009 #endif
2010 #if CMK_SMP_TRACE_COMMTHREAD
2011     traceBeginIdle();
2012 #endif
2013 #endif
2014 }
2015
2016 /* useDynamicSMSG */
2017 static void    PumpDatagramConnection()
2018 {
2019     uint32_t          remote_address;
2020     uint32_t          remote_id;
2021     gni_return_t status;
2022     gni_post_state_t  post_state;
2023     uint64_t          datagram_id;
2024     int i;
2025
2026    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2027    {
2028        if (datagram_id >= mysize) {           /* bound endpoint */
2029            int pe = datagram_id - mysize;
2030            CMI_GNI_LOCK(global_gni_lock)
2031            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2032            CMI_GNI_UNLOCK(global_gni_lock)
2033            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2034            {
2035                CmiAssert(remote_id == pe);
2036                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2037                GNI_RC_CHECK("Dynamic SMSG Init", status);
2038 #if PRINT_SYH
2039                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
2040 #endif
2041                CmiAssert(smsg_connected_flag[pe] == 1);
2042                smsg_connected_flag[pe] = 2;
2043            }
2044        }
2045        else {         /* unbound ep */
2046            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2047            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2048            {
2049                CmiAssert(remote_id<mysize);
2050                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2051                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2052                GNI_RC_CHECK("Dynamic SMSG Init", status);
2053 #if PRINT_SYH
2054                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
2055 #endif
2056                smsg_connected_flag[remote_id] = 2;
2057
2058                alloc_smsg_attr(&send_smsg_attr);
2059                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2060                GNI_RC_CHECK("post unbound datagram", status);
2061            }
2062        }
2063    }
2064 }
2065
2066 /* pooling CQ to receive network message */
2067 static void PumpNetworkRdmaMsgs()
2068 {
2069     gni_cq_entry_t      event_data;
2070     gni_return_t        status;
2071
2072 }
2073
2074 inline 
2075 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
2076 {
2077     RDMA_REQUEST        *rdma_request_msg;
2078     MallocRdmaRequest(rdma_request_msg);
2079     rdma_request_msg->destNode = inst_id;
2080     rdma_request_msg->pd = pd;
2081 #if REMOTE_EVENT
2082     rdma_request_msg->ack_index = ack_index;
2083 #endif
2084 #if CMK_SMP
2085     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2086 #else
2087     if(sendRdmaBuf == 0)
2088     {
2089         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
2090     }else{
2091         sendRdmaTail->next = rdma_request_msg;
2092         sendRdmaTail =  rdma_request_msg;
2093     }
2094 #endif
2095
2096 }
2097
2098 static void getLargeMsgRequest(void* header, uint64_t inst_id);
2099
2100 static void PumpNetworkSmsg()
2101 {
2102     uint64_t            inst_id;
2103     int                 ret;
2104     gni_cq_entry_t      event_data;
2105     gni_return_t        status, status2;
2106     void                *header;
2107     uint8_t             msg_tag;
2108     int                 msg_nbytes;
2109     void                *msg_data;
2110     gni_mem_handle_t    msg_mem_hndl;
2111     gni_smsg_attr_t     *smsg_attr;
2112     gni_smsg_attr_t     *remote_smsg_attr;
2113     int                 init_flag;
2114     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2115     uint64_t            source_addr;
2116     SMSG_QUEUE         *queue = &smsg_queue;
2117 #if   CMK_DIRECT
2118     cmidirectMsg        *direct_msg;
2119 #endif
2120 #if CMI_EXERT_RECV_CAP
2121     int                  recv_cnt = 0;
2122 #endif
2123     while(1)
2124     {
2125         CMI_GNI_LOCK(smsg_rx_cq_lock)
2126         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2127         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2128         if(status != GNI_RC_SUCCESS)
2129             break;
2130         inst_id = GNI_CQ_GET_INST_ID(event_data);
2131 #if REMOTE_EVENT
2132         inst_id = GET_RANK(inst_id);      /* important */
2133 #endif
2134         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2135 #if PRINT_SYH
2136         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2137 #endif
2138         if (useDynamicSMSG) {
2139             /* subtle: smsg may come before connection is setup */
2140             while (smsg_connected_flag[inst_id] != 2) 
2141                PumpDatagramConnection();
2142         }
2143         msg_tag = GNI_SMSG_ANY_TAG;
2144         while(1) {
2145             CMI_GNI_LOCK(smsg_mailbox_lock)
2146             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2147             if (status != GNI_RC_SUCCESS)
2148             {
2149                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2150                 break;
2151             }
2152 #if PRINT_SYH
2153             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2154 #endif
2155             /* copy msg out and then put into queue (small message) */
2156             switch (msg_tag) {
2157             case SMALL_DATA_TAG:
2158             {
2159                 START_EVENT();
2160                 msg_nbytes = CmiGetMsgSize(header);
2161                 msg_data    = CmiAlloc(msg_nbytes);
2162                 memcpy(msg_data, (char*)header, msg_nbytes);
2163                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2164                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2165                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
2166                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2167                 handleOneRecvedMsg(msg_nbytes, msg_data);
2168                 break;
2169             }
2170             case LMSG_INIT_TAG:
2171             {
2172 #if MULTI_THREAD_SEND
2173                 MallocControlMsg(control_msg_tmp);
2174                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2175                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2176                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2177                 getLargeMsgRequest(control_msg_tmp, inst_id);
2178                 FreeControlMsg(control_msg_tmp);
2179 #else
2180                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2181                 getLargeMsgRequest(header, inst_id);
2182                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2183 #endif
2184                 break;
2185             }
2186 #if !REMOTE_EVENT && !CQWRITE
2187             case ACK_TAG:   //msg fit into mempool
2188             {
2189                 /* Get is done, release message . Now put is not used yet*/
2190                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2191                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2192                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2193 #if ! USE_LRTS_MEMPOOL
2194                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2195 #else
2196                 DecreaseMsgInSend(msg);
2197 #endif
2198                 if(NoMsgInSend(msg))
2199                     buffered_send_msg -= GetMempoolsize(msg);
2200                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2201                 CmiFree(msg);
2202                 break;
2203             }
2204 #endif
2205             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2206             {
2207 #if MULTI_THREAD_SEND
2208                 MallocControlMsg(header_tmp);
2209                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2210                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2211 #else
2212                 header_tmp = (CONTROL_MSG *) header;
2213 #endif
2214                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2215                 void *msg = (void*)(header_tmp->source_addr);
2216                 int cur_seq = CmiGetMsgSeq(msg);
2217                 int offset = ONE_SEG*(cur_seq+1);
2218                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2219                 buffered_send_msg -= header_tmp->length;
2220                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2221                 if (remain_size < 0) remain_size = 0;
2222                 CmiSetMsgSize(msg, remain_size);
2223                 if(remain_size <= 0) //transaction done
2224                 {
2225                     CmiFree(msg);
2226                 }else if (header_tmp->total_length > offset)
2227                 {
2228                     CmiSetMsgSeq(msg, cur_seq+1);
2229                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2230                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2231                     //send next seg
2232                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2233                          // pipelining
2234                     if (header_tmp->seq_id == 1) {
2235                       int i;
2236                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2237                         int seq = cur_seq+i+2;
2238                         CmiSetMsgSeq(msg, seq-1);
2239                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2240                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2241                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2242                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2243                       }
2244                     }
2245                 }
2246 #if MULTI_THREAD_SEND
2247                 FreeControlMsg(header_tmp);
2248 #else
2249                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2250 #endif
2251                 break;
2252             }
2253 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2254             case PUT_DONE_TAG:  {   //persistent message
2255                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2256                 int size = ((CONTROL_MSG *) header)->length;
2257                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2258                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2259                 CmiReference(msg);
2260                 CMI_CHECK_CHECKSUM(msg, size);
2261                 handleOneRecvedMsg(size, msg); 
2262 #if PRINT_SYH
2263                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2264 #endif
2265                 break;
2266             }
2267 #endif
2268 #if CMK_DIRECT
2269             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2270                 //create a trigger message
2271                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2272                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2273                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2274                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2275                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2276                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2277                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2278                 break;
2279 #endif
2280             default:
2281                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2282                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2283                 printf("weird tag problem\n");
2284                 CmiAbort("Unknown tag\n");
2285             }               // end switch
2286 #if PRINT_SYH
2287             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2288 #endif
2289             smsg_recv_count ++;
2290             msg_tag = GNI_SMSG_ANY_TAG;
2291 #if CMI_EXERT_RECV_CAP
2292             if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
2293 #endif
2294         } //endwhile GNI_SmsgGetNextWTag
2295     }   //end while GetEvent
2296     if(status == GNI_RC_ERROR_RESOURCE)
2297     {
2298         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2299         GNI_RC_CHECK("Smsg_rx_cq full", status);
2300     }
2301 }
2302
2303 static void printDesc(gni_post_descriptor_t *pd)
2304 {
2305     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2306 }
2307
2308 #if CQWRITE
2309 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2310 {
2311     gni_post_descriptor_t *pd;
2312     gni_return_t        status = GNI_RC_SUCCESS;
2313     
2314     MallocPostDesc(pd);
2315     pd->type = GNI_POST_CQWRITE;
2316     pd->cq_mode = GNI_CQMODE_SILENT;
2317     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2318     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2319     pd->cqwrite_value = data;
2320     pd->remote_mem_hndl = mem_hndl;
2321     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2322     GNI_RC_CHECK("GNI_PostCqWrite", status);
2323 }
2324 #endif
2325
2326 // register memory for a message
2327 // return mem handle
2328 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2329 {
2330     gni_return_t status = GNI_RC_SUCCESS;
2331
2332     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2333
2334 #if CMK_PERSISTENT_COMM
2335       // persistent message is always registered
2336       // BIG_MSG small pieces do not have malloc chunk header
2337     if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2338         *memh = MEMHFIELD(msg);
2339         return GNI_RC_SUCCESS;
2340     }
2341 #endif
2342     if(seqno == 0 
2343 #if CMK_PERSISTENT_COMM
2344          || seqno == PERSIST_SEQ
2345 #endif
2346       )
2347     {
2348         if(IsMemHndlZero((GetMemHndl(msg))))
2349         {
2350             msg = (void*)(msg);
2351             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2352             if(status == GNI_RC_SUCCESS)
2353                 *memh = GetMemHndl(msg);
2354         }
2355         else {
2356             *memh = GetMemHndl(msg);
2357         }
2358     }
2359     else {
2360         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2361         status = registerMemory(msg, size, memh, NULL); 
2362     }
2363     return status;
2364 }
2365
2366 // for BIG_MSG called on receiver side for receiving control message
2367 // LMSG_INIT_TAG
2368 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2369 {
2370 #if     USE_LRTS_MEMPOOL
2371     CONTROL_MSG         *request_msg;
2372     gni_return_t        status = GNI_RC_SUCCESS;
2373     void                *msg_data;
2374     gni_post_descriptor_t *pd;
2375     gni_mem_handle_t    msg_mem_hndl;
2376     int                 size, transaction_size, offset = 0;
2377     size_t              register_size = 0;
2378
2379     // initial a get to transfer data from the sender side */
2380     request_msg = (CONTROL_MSG *) header;
2381     size = request_msg->total_length;
2382     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2383     MallocPostDesc(pd);
2384 #if CMK_WITH_STATS 
2385     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2386 #endif
2387     if(request_msg->seq_id < 2)   {
2388 #if CMK_SMP_TRACE_COMMTHREAD 
2389         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2390 #endif
2391         msg_data = CmiAlloc(size);
2392         CmiSetMsgSeq(msg_data, 0);
2393         _MEMCHECK(msg_data);
2394     }
2395     else {
2396         offset = ONE_SEG*(request_msg->seq_id-1);
2397         msg_data = (char*)request_msg->dest_addr + offset;
2398     }
2399    
2400     pd->cqwrite_value = request_msg->seq_id;
2401
2402     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2403     SetMemHndlZero(pd->local_mem_hndl);
2404     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2405     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2406         if(NoMsgInRecv( (void*)(msg_data)))
2407             register_size = GetMempoolsize((void*)(msg_data));
2408     }
2409
2410     pd->first_operand = ALIGN64(size);                   //  total length
2411
2412     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2413         pd->type            = GNI_POST_FMA_GET;
2414     else
2415         pd->type            = GNI_POST_RDMA_GET;
2416     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2417     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2418     pd->length          = transaction_size;
2419     pd->local_addr      = (uint64_t) msg_data;
2420     pd->remote_addr     = request_msg->source_addr + offset;
2421     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2422     pd->src_cq_hndl     = rdma_tx_cqh;
2423     pd->rdma_mode       = 0;
2424     pd->amo_cmd         = 0;
2425 #if USE_RDMA_CAP
2426     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2427 #endif
2428     //memory registration success
2429     if(status == GNI_RC_SUCCESS )
2430     {
2431         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2432         CMI_GNI_LOCK(lock)
2433 #if REMOTE_EVENT
2434         if( request_msg->seq_id == 0)
2435         {
2436             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2437             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2438             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2439         }
2440 #endif
2441
2442 #if CMK_WITH_STATS
2443         RDMA_TRY_SEND(pd->type)
2444 #endif
2445         if(pd->type == GNI_POST_RDMA_GET) 
2446         {
2447             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2448         }
2449         else
2450         {
2451             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2452         }
2453         CMI_GNI_UNLOCK(lock)
2454
2455         if(status == GNI_RC_SUCCESS )
2456         {
2457 #if USE_RDMA_CAP
2458             RDMA_pending++;
2459 #endif
2460             if(pd->cqwrite_value == 0)
2461             {
2462 #if MACHINE_DEBUG_LOG
2463                 buffered_recv_msg += register_size;
2464                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2465 #endif
2466                 IncreaseMsgInRecv(msg_data);
2467 #if CMK_SMP_TRACE_COMMTHREAD 
2468                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2469 #endif
2470             }
2471 #if  CMK_WITH_STATS
2472             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2473             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2474 #endif
2475         }
2476     }else
2477     {
2478         SetMemHndlZero((pd->local_mem_hndl));
2479     }
2480     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2481     {
2482 #if REMOTE_EVENT
2483         bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
2484 #else
2485         bufferRdmaMsg(inst_id, pd, -1); 
2486 #endif
2487     }else if (status != GNI_RC_SUCCESS) {
2488         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2489         GNI_RC_CHECK("GetLargeAFter posting", status);
2490     }
2491 #else
2492     CONTROL_MSG         *request_msg;
2493     gni_return_t        status;
2494     void                *msg_data;
2495     gni_post_descriptor_t *pd;
2496     RDMA_REQUEST        *rdma_request_msg;
2497     gni_mem_handle_t    msg_mem_hndl;
2498     //int source;
2499     // initial a get to transfer data from the sender side */
2500     request_msg = (CONTROL_MSG *) header;
2501     msg_data = CmiAlloc(request_msg->length);
2502     _MEMCHECK(msg_data);
2503
2504     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2505
2506     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2507     {
2508         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2509     }
2510
2511     MallocPostDesc(pd);
2512     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2513         pd->type            = GNI_POST_FMA_GET;
2514     else
2515         pd->type            = GNI_POST_RDMA_GET;
2516     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2517     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2518     pd->length          = ALIGN64(request_msg->length);
2519     pd->local_addr      = (uint64_t) msg_data;
2520     pd->remote_addr     = request_msg->source_addr;
2521     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2522     pd->src_cq_hndl     = rdma_tx_cqh;
2523     pd->rdma_mode       = 0;
2524     pd->amo_cmd         = 0;
2525
2526     //memory registration successful
2527     if(status == GNI_RC_SUCCESS)
2528     {
2529         pd->local_mem_hndl  = msg_mem_hndl;
2530        
2531         if(pd->type == GNI_POST_RDMA_GET) 
2532         {
2533             CMI_GNI_LOCK(rdma_tx_cq_lock)
2534             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2535             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2536         }
2537         else
2538         {
2539             CMI_GNI_LOCK(default_tx_cq_lock)
2540             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2541             CMI_GNI_UNLOCK(default_tx_cq_lock)
2542         }
2543
2544     }else
2545     {
2546         SetMemHndlZero(pd->local_mem_hndl);
2547     }
2548     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2549     {
2550         MallocRdmaRequest(rdma_request_msg);
2551         rdma_request_msg->next = 0;
2552         rdma_request_msg->destNode = inst_id;
2553         rdma_request_msg->pd = pd;
2554         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2555     }else {
2556         GNI_RC_CHECK("AFter posting", status);
2557     }
2558 #endif
2559 }
2560
2561 #if CQWRITE
2562 static void PumpCqWriteTransactions()
2563 {
2564
2565     gni_cq_entry_t          ev;
2566     gni_return_t            status;
2567     void                    *msg;  
2568     int                     msg_size;
2569     while(1) {
2570         //CMI_GNI_LOCK(my_cq_lock) 
2571         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2572         //CMI_GNI_UNLOCK(my_cq_lock)
2573         if(status != GNI_RC_SUCCESS) break;
2574         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2575 #if CMK_PERSISTENT_COMM
2576 #if PRINT_SYH
2577         printf(" %d CQ write event %p\n", myrank, msg);
2578 #endif
2579         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2580 #if PRINT_SYH
2581             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2582 #endif
2583             CmiReference(msg);
2584             msg_size = CmiGetMsgSize(msg);
2585             CMI_CHECK_CHECKSUM(msg, msg_size);
2586             handleOneRecvedMsg(msg_size, msg); 
2587             continue;
2588         }
2589 #endif
2590         DecreaseMsgInSend(msg);
2591 #if ! USE_LRTS_MEMPOOL
2592        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2593 #else
2594         DecreaseMsgInSend(msg);
2595 #endif
2596         if(NoMsgInSend(msg))
2597             buffered_send_msg -= GetMempoolsize(msg);
2598         CmiFree(msg);
2599     };
2600     if(status == GNI_RC_ERROR_RESOURCE)
2601     {
2602         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2603     }
2604 }
2605 #endif
2606
2607 #if REMOTE_EVENT
2608 static void PumpRemoteTransactions()
2609 {
2610     gni_cq_entry_t          ev;
2611     gni_return_t            status;
2612     void                    *msg;   
2613     int                     inst_id, index, type, size;
2614
2615     while(1) {
2616         CMI_GNI_LOCK(global_gni_lock)
2617         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2618         CMI_GNI_UNLOCK(global_gni_lock)
2619
2620         if(status != GNI_RC_SUCCESS) break;
2621
2622         inst_id = GNI_CQ_GET_INST_ID(ev);
2623         index = GET_INDEX(inst_id);
2624         type = GET_TYPE(inst_id);
2625
2626         switch (type) {
2627         case 0:    // ACK
2628             CmiAssert(GetIndexType(ackPool, index) == 1);
2629             msg = GetIndexAddress(ackPool, index);
2630             DecreaseMsgInSend(msg);
2631 #if ! USE_LRTS_MEMPOOL
2632            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2633 #else
2634             DecreaseMsgInSend(msg);
2635 #endif
2636             if(NoMsgInSend(msg))
2637                 buffered_send_msg -= GetMempoolsize(msg);
2638             CmiFree(msg);
2639             IndexPool_freeslot(&ackPool, index);
2640             break;
2641 #if CMK_PERSISTENT_COMM
2642         case 1:  {    // PERSISTENT
2643             CmiAssert(GetIndexType(persistPool, index) == 2);
2644             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2645             msg = slot->destBuf[0].destAddress;
2646             size = CmiGetMsgSize(msg);
2647             CmiReference(msg);
2648             CMI_CHECK_CHECKSUM(msg, size);
2649             handleOneRecvedMsg(size, msg); 
2650             break;
2651             }
2652 #endif
2653         default:
2654             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2655             CmiAbort("PumpRemoteTransactions: unknown type");
2656         }
2657     }
2658     if(status == GNI_RC_ERROR_RESOURCE)
2659     {
2660         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2661     }
2662 }
2663 #endif
2664
2665 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2666 {
2667     gni_cq_entry_t          ev;
2668     gni_return_t            status;
2669     uint64_t                type, inst_id;
2670     gni_post_descriptor_t   *tmp_pd;
2671     MSG_LIST                *ptr;
2672     CONTROL_MSG             *ack_msg_tmp;
2673     ACK_MSG                 *ack_msg;
2674     uint8_t                 msg_tag;
2675 #if CMK_DIRECT
2676     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2677 #endif
2678     SMSG_QUEUE         *queue = &smsg_queue;
2679
2680     while(1) {
2681         CMI_GNI_LOCK(my_cq_lock) 
2682         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2683         CMI_GNI_UNLOCK(my_cq_lock)
2684         if(status != GNI_RC_SUCCESS) break;
2685         
2686         type = GNI_CQ_GET_TYPE(ev);
2687         if (type == GNI_CQ_EVENT_TYPE_POST)
2688         {
2689
2690 #if USE_RDMA_CAP
2691             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2692             RDMA_pending--;
2693 #endif
2694             inst_id     = GNI_CQ_GET_INST_ID(ev);
2695 #if PRINT_SYH
2696             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2697 #endif
2698             CMI_GNI_LOCK(my_cq_lock)
2699             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2700             CMI_GNI_UNLOCK(my_cq_lock)
2701
2702             switch (tmp_pd->type) {
2703 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2704             case GNI_POST_RDMA_PUT:
2705 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2706                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2707 #endif
2708             case GNI_POST_FMA_PUT:
2709                 if(tmp_pd->amo_cmd == 1) {
2710 #if CMK_DIRECT
2711                     //sender ACK to receiver to trigger it is done
2712                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2713                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2714                     msg_tag = DIRECT_PUT_DONE_TAG;
2715 #endif
2716                 }
2717                 else {
2718                     CmiFree((void *)tmp_pd->local_addr);
2719 #if REMOTE_EVENT
2720                     FreePostDesc(tmp_pd);
2721                     continue;
2722 #elif CQWRITE
2723                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2724                     FreePostDesc(tmp_pd);
2725                     continue;
2726 #else
2727                     MallocControlMsg(ack_msg_tmp);
2728                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2729                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2730                     ack_msg_tmp->length  = tmp_pd->length;
2731                     msg_tag = PUT_DONE_TAG;
2732 #endif
2733                 }
2734                 break;
2735 #endif
2736             case GNI_POST_RDMA_GET:
2737             case GNI_POST_FMA_GET:  {
2738 #if  ! USE_LRTS_MEMPOOL
2739                 MallocControlMsg(ack_msg_tmp);
2740                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2741                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2742                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2743                 msg_tag = ACK_TAG;  
2744 #else
2745 #if CMK_WITH_STATS
2746                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2747 #endif
2748                 int seq_id = tmp_pd->cqwrite_value;
2749                 if(seq_id > 0)      // BIG_MSG
2750                 {
2751                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2752                     MallocControlMsg(ack_msg_tmp);
2753                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2754                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2755                     ack_msg_tmp->seq_id = seq_id;
2756                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2757                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2758                     ack_msg_tmp->length = tmp_pd->length;
2759                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2760                     msg_tag = BIG_MSG_TAG; 
2761                 } 
2762                 else
2763                 {
2764                     msg_tag = ACK_TAG; 
2765 #if  !REMOTE_EVENT && !CQWRITE
2766                     MallocAckMsg(ack_msg);
2767                     ack_msg->source_addr = tmp_pd->remote_addr;
2768 #endif
2769                 }
2770 #endif
2771                 break;
2772             }
2773             case  GNI_POST_CQWRITE:
2774                    FreePostDesc(tmp_pd);
2775                    continue;
2776             default:
2777                 CmiPrintf("type=%d\n", tmp_pd->type);
2778                 CmiAbort("PumpLocalTransactions: unknown type!");
2779             }      /* end of switch */
2780
2781 #if CMK_DIRECT
2782             if (tmp_pd->amo_cmd == 1) {
2783                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2784                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2785             }
2786             else
2787 #endif
2788             if (msg_tag == ACK_TAG) {
2789 #if !REMOTE_EVENT
2790 #if   !CQWRITE
2791                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2792                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2793 #else
2794                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2795 #endif
2796 #endif
2797             }
2798             else {
2799                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2800                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2801             }
2802 #if CMK_PERSISTENT_COMM
2803             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2804 #endif
2805             {
2806                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2807 #if PRINT_SYH
2808                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2809 #endif
2810                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2811                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2812
2813                     START_EVENT();
2814                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2815                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2816 #if MACHINE_DEBUG_LOG
2817                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2818                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2819                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2820 #endif
2821                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2822                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2823                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2824                 }else if(msg_tag == BIG_MSG_TAG){
2825                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2826                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2827                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2828                         START_EVENT();
2829 #if PRINT_SYH
2830                         printf("Pipeline msg done [%d]\n", myrank);
2831 #endif
2832 #if                 CMK_SMP_TRACE_COMMTHREAD
2833                         if( tmp_pd->cqwrite_value == 1)
2834                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2835 #endif
2836                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2837                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2838                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2839                     }
2840                 }
2841             }
2842             FreePostDesc(tmp_pd);
2843         }
2844     } //end while
2845     if(status == GNI_RC_ERROR_RESOURCE)
2846     {
2847         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2848         GNI_RC_CHECK("Smsg_tx_cq full", status);
2849     }
2850 }
2851
2852 static void  SendRdmaMsg()
2853 {
2854     gni_return_t            status = GNI_RC_SUCCESS;
2855     gni_mem_handle_t        msg_mem_hndl;
2856     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2857     RDMA_REQUEST            *pre = 0;
2858     uint64_t                register_size = 0;
2859     void                    *msg;
2860     int                     i;
2861 #if CMK_SMP
2862     int len = PCQueueLength(sendRdmaBuf);
2863     for (i=0; i<len; i++)
2864     {
2865 #if USE_RDMA_CAP
2866          if( RDMA_pending >= RDMA_cap) break;
2867 #endif
2868         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2869         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2870         CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2871         if (ptr == NULL) break;
2872 #else
2873     ptr = sendRdmaBuf;
2874     while (ptr!=0 && RDMA_pending < RDMA_cap)
2875     {
2876 #if USE_RDMA_CAP
2877          if( RDMA_pending >= RDMA_cap) break;
2878 #endif
2879 #endif 
2880         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2881         gni_post_descriptor_t *pd = ptr->pd;
2882         
2883         msg = (void*)(pd->local_addr);
2884         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2885         register_size = 0;
2886         if(pd->cqwrite_value == 0) {
2887             if(NoMsgInRecv(msg))
2888                 register_size = GetMempoolsize(msg);
2889         }
2890
2891         if(status == GNI_RC_SUCCESS)        //mem register good
2892         {
2893             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2894             CMI_GNI_LOCK(lock);
2895 #if REMOTE_EVENT
2896             if( pd->cqwrite_value == 0) {
2897                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2898                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2899                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2900             }
2901 #if CMK_PERSISTENT_COMM
2902             else if (pd->cqwrite_value == PERSIST_SEQ) {
2903                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2904                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
2905                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2906             }
2907 #endif
2908 #endif
2909 #if CMK_WITH_STATS
2910             RDMA_TRY_SEND(pd->type)
2911 #endif
2912             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2913             {
2914                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2915             }
2916             else
2917             {
2918                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2919             }
2920             CMI_GNI_UNLOCK(lock);
2921             
2922             if(status == GNI_RC_SUCCESS)    //post good
2923             {
2924 #if USE_RDMA_CAP
2925                 RDMA_pending ++;
2926 #endif
2927 #if !CMK_SMP
2928                 tmp_ptr = ptr;
2929                 if(pre != 0) {
2930                     pre->next = ptr->next;
2931                 }
2932                 else {
2933                     sendRdmaBuf = ptr->next;
2934                 }
2935                 ptr = ptr->next;
2936                 FreeRdmaRequest(tmp_ptr);
2937 #endif
2938                 if(pd->cqwrite_value == 0)
2939                 {
2940 #if CMK_SMP_TRACE_COMMTHREAD 
2941                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2942 #endif
2943                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2944                 }
2945 #if  CMK_WITH_STATS
2946                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2947                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2948 #endif
2949 #if MACHINE_DEBUG_LOG
2950                 buffered_recv_msg += register_size;
2951                 MACHSTATE(8, "GO request from buffered\n"); 
2952 #endif
2953 #if PRINT_SYH
2954                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2955 #endif
2956             }else           // cannot post
2957             {
2958 #if CMK_SMP
2959                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2960 #else
2961                 pre = ptr;
2962                 ptr = ptr->next;
2963 #endif
2964 #if PRINT_SYH
2965                 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2966 #endif
2967                 break;
2968             }
2969         } else          //memory registration fails
2970         {
2971 #if CMK_SMP
2972             PCQueuePush(sendRdmaBuf, (char*)ptr);
2973 #else
2974             pre = ptr;
2975             ptr = ptr->next;
2976 #endif
2977         }
2978     } //end while
2979 #if ! CMK_SMP
2980     if(ptr == 0)
2981         sendRdmaTail = pre;
2982 #endif
2983 }
2984
2985 // return 1 if all messages are sent
2986 static int SendBufferMsg(SMSG_QUEUE *queue)
2987 {
2988     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2989     CONTROL_MSG         *control_msg_tmp;
2990     gni_return_t        status;
2991     int                 done = 1;
2992     uint64_t            register_size;
2993     void                *register_addr;
2994     int                 index_previous = -1;
2995 #if CMI_EXERT_SEND_CAP
2996     int                 sent_cnt = 0;
2997 #endif
2998
2999 #if CMK_SMP
3000     int          index = 0;
3001 #if ONE_SEND_QUEUE
3002     memset(destpe_avail, 0, mysize * sizeof(char));
3003     for (index=0; index<1; index++)
3004     {
3005         int i, len = PCQueueLength(queue->sendMsgBuf);
3006         for (i=0; i<len; i++) 
3007         {
3008             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3009             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3010             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3011             if(ptr == NULL) break;
3012             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3013                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3014                 continue;
3015             }
3016 #else
3017 #if SMP_LOCKS
3018     int nonempty = PCQueueLength(nonEmptyQueues);
3019     for(index =0; index<nonempty; index++)
3020     {
3021         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
3022         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
3023         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
3024         if(current_list == NULL) break; 
3025         PCQueue current_queue= current_list-> sendSmsgBuf;
3026         CmiLock(current_list->lock);
3027         int i, len = PCQueueLength(current_queue);
3028         current_list->pushed = 0;
3029         CmiUnlock(current_list->lock);
3030 #else
3031     for(index =0; index<mysize; index++)
3032     {
3033         //if (index == myrank) continue;
3034         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3035         int i, len = PCQueueLength(current_queue);
3036 #endif
3037         for (i=0; i<len; i++) 
3038         {
3039             CMI_PCQUEUEPOP_LOCK(current_queue)
3040             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3041             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3042             if (ptr == 0) break;
3043 #endif
3044 #else
3045     int index = queue->smsg_head_index;
3046     while(index != -1)
3047     {
3048         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
3049         pre = 0;
3050         while(ptr != 0)
3051         {
3052 #endif
3053             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3054             status = GNI_RC_ERROR_RESOURCE;
3055             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
3056                 /* connection not exists yet */
3057             }
3058             else
3059             switch(ptr->tag)
3060             {
3061             case SMALL_DATA_TAG:
3062                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3063                 if(status == GNI_RC_SUCCESS)
3064                 {
3065                     CmiFree(ptr->msg);
3066                 }
3067                 break;
3068             case LMSG_INIT_TAG:
3069                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3070                 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
3071                 break;
3072 #if !REMOTE_EVENT && !CQWRITE
3073             case ACK_TAG:
3074                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3075                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3076                 break;
3077 #endif
3078             case BIG_MSG_TAG:
3079                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3080                 if(status == GNI_RC_SUCCESS)
3081                 {
3082                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3083                 }
3084                 break;
3085 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3086             case PUT_DONE_TAG:
3087                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3088                 if(status == GNI_RC_SUCCESS)
3089                 {
3090                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
3091                 }
3092                 break;
3093 #endif
3094 #if CMK_DIRECT
3095             case DIRECT_PUT_DONE_TAG:
3096                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3097                 if(status == GNI_RC_SUCCESS)
3098                 {
3099                     free((CMK_DIRECT_HEADER*)ptr->msg);
3100                 }
3101                 break;
3102
3103 #endif
3104             default:
3105                 printf("Weird tag\n");
3106                 CmiAbort("should not happen\n");
3107             }       // end switch
3108             if(status == GNI_RC_SUCCESS)
3109             {
3110 #if PRINT_SYH
3111                 buffered_smsg_counter--;
3112                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3113 #endif
3114 #if !CMK_SMP
3115                 tmp_ptr = ptr;
3116                 if(pre)
3117                 {
3118                     ptr = pre ->next = ptr->next;
3119                 }else
3120                 {
3121                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
3122                 }
3123                 FreeMsgList(tmp_ptr);
3124 #else
3125                 FreeMsgList(ptr);
3126 #endif
3127 #if CMI_EXERT_SEND_CAP
3128                 if(++sent_cnt == SEND_CAP) break;
3129 #endif
3130             }else {
3131 #if CMK_SMP
3132 #if ONE_SEND_QUEUE
3133                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3134 #else
3135                 PCQueuePush(current_queue, (char*)ptr);
3136 #endif
3137 #else
3138                 pre = ptr;
3139                 ptr=ptr->next;
3140 #endif
3141                 done = 0;
3142                 if(status == GNI_RC_ERROR_RESOURCE)
3143                 {
3144 #if CMK_SMP && ONE_SEND_QUEUE 
3145                     destpe_avail[ptr->destNode] = 1;
3146 #else
3147                     break;
3148 #endif
3149                 }
3150             } 
3151         } //end while
3152 #if !CMK_SMP
3153         if(ptr == 0)
3154             queue->smsg_msglist_index[index].tail = pre;
3155         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
3156         {
3157             if(index_previous != -1)
3158                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
3159             else
3160                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
3161         }
3162         else {
3163             index_previous = index;
3164         }
3165         index = queue->smsg_msglist_index[index].next;
3166 #else
3167 #if !ONE_SEND_QUEUE && SMP_LOCKS
3168         CmiLock(current_list->lock);
3169         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3170         {
3171             current_list->pushed = 1;
3172             PCQueuePush(nonEmptyQueues, current_list);
3173         }
3174         CmiUnlock(current_list->lock); 
3175 #endif
3176 #endif
3177
3178 #if CMI_EXERT_SEND_CAP
3179         if(sent_cnt == SEND_CAP) {
3180             done = 0;
3181             break;
3182         }
3183 #endif
3184     }   // end pooling for all cores
3185     return done;
3186 }
3187
3188 static void ProcessDeadlock();
3189 void LrtsAdvanceCommunication(int whileidle)
3190 {
3191     static int count = 0;
3192     /*  Receive Msg first */
3193 #if CMK_SMP_TRACE_COMMTHREAD
3194     double startT, endT;
3195 #endif
3196     if (useDynamicSMSG && whileidle)
3197     {
3198 #if CMK_SMP_TRACE_COMMTHREAD
3199         startT = CmiWallTimer();
3200 #endif
3201         PumpDatagramConnection();
3202 #if CMK_SMP_TRACE_COMMTHREAD
3203         endT = CmiWallTimer();
3204         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3205 #endif
3206     }
3207
3208 #if CMK_SMP_TRACE_COMMTHREAD
3209     startT = CmiWallTimer();
3210 #endif
3211     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3212     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
3213 #if CMK_SMP_TRACE_COMMTHREAD
3214     endT = CmiWallTimer();
3215     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3216 #endif
3217
3218 #if CMK_SMP_TRACE_COMMTHREAD
3219     startT = CmiWallTimer();
3220 #endif
3221     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3222     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3223 #if CMK_SMP_TRACE_COMMTHREAD
3224     endT = CmiWallTimer();
3225     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3226 #endif
3227
3228 #if CMK_SMP_TRACE_COMMTHREAD
3229     startT = CmiWallTimer();
3230 #endif
3231     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3232
3233 #if CQWRITE
3234     PumpCqWriteTransactions();
3235 #endif
3236
3237 #if REMOTE_EVENT
3238     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
3239 #endif
3240
3241     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
3242 #if CMK_SMP_TRACE_COMMTHREAD
3243     endT = CmiWallTimer();
3244     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3245 #endif
3246  
3247 #if CMK_SMP_TRACE_COMMTHREAD
3248     startT = CmiWallTimer();
3249 #endif
3250     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
3251     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
3252 #if CMK_SMP_TRACE_COMMTHREAD
3253     endT = CmiWallTimer();
3254     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3255 #endif
3256
3257     /* Send buffered Message */
3258 #if CMK_SMP_TRACE_COMMTHREAD
3259     startT = CmiWallTimer();
3260 #endif
3261 #if CMK_USE_OOB
3262     if (SendBufferMsg(&smsg_oob_queue) == 1)
3263 #endif
3264     {
3265         STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
3266     }
3267     //MACHSTATE(8, "after SendBufferMsg\n") ; 
3268 #if CMK_SMP_TRACE_COMMTHREAD
3269     endT = CmiWallTimer();
3270     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3271 #endif
3272
3273 #if CMK_SMP && ! LARGEPAGE
3274     if (_detected_hang)  ProcessDeadlock();
3275 #endif
3276 }
3277
3278 /* useDynamicSMSG */
3279 static void _init_dynamic_smsg()
3280 {
3281     gni_return_t status;
3282     uint32_t     vmdh_index = -1;
3283     int i;
3284
3285     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3286     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3287     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3288     for(i=0; i<mysize; i++) {
3289         smsg_connected_flag[i] = 0;
3290         smsg_attr_vector_local[i] = NULL;
3291         smsg_attr_vector_remote[i] = NULL;
3292     }
3293     if(mysize <=512)
3294     {
3295         SMSG_MAX_MSG = 4096;
3296     }else if (mysize <= 4096)
3297     {
3298         SMSG_MAX_MSG = 4096/mysize * 1024;
3299     }else if (mysize <= 16384)
3300     {
3301         SMSG_MAX_MSG = 512;
3302     }else {
3303         SMSG_MAX_MSG = 256;
3304     }
3305
3306     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3307     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3308     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3309     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3310     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3311
3312     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3313     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3314     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3315     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3316     mailbox_list->offset = 0;
3317     mailbox_list->next = 0;
3318     
3319     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3320         mailbox_list->size, smsg_rx_cqh,
3321         GNI_MEM_READWRITE,   
3322         vmdh_index,
3323         &(mailbox_list->mem_hndl));
3324     GNI_RC_CHECK("MEMORY registration for smsg", status);
3325
3326     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3327     GNI_RC_CHECK("Unbound EP", status);
3328     
3329     alloc_smsg_attr(&send_smsg_attr);
3330
3331     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3332     GNI_RC_CHECK("post unbound datagram", status);
3333
3334       /* always pre-connect to proc 0 */
3335     //if (myrank != 0) connect_to(0);
3336 }
3337
3338 static void _init_static_smsg()
3339 {
3340     gni_smsg_attr_t      *smsg_attr;
3341     gni_smsg_attr_t      remote_smsg_attr;
3342     gni_smsg_attr_t      *smsg_attr_vec;
3343     gni_mem_handle_t     my_smsg_mdh_mailbox;
3344     int      ret, i;
3345     gni_return_t status;
3346     uint32_t              vmdh_index = -1;
3347     mdh_addr_t            base_infor;
3348     mdh_addr_t            *base_addr_vec;
3349     char *env;
3350
3351     if(mysize <=512)
3352     {
3353         SMSG_MAX_MSG = 1024;
3354     }else if (mysize <= 4096)
3355     {
3356         SMSG_MAX_MSG = 1024;
3357     }else if (mysize <= 16384)
3358     {
3359         SMSG_MAX_MSG = 512;
3360     }else {
3361         SMSG_MAX_MSG = 256;
3362     }
3363     
3364     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3365     if (env) SMSG_MAX_MSG = atoi(env);
3366     CmiAssert(SMSG_MAX_MSG > 0);
3367
3368     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3369     
3370     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3371     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3372     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3373     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3374     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3375     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3376     CmiAssert(ret == 0);
3377     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3378     
3379     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3380             smsg_memlen*(mysize), smsg_rx_cqh,
3381             GNI_MEM_READWRITE,   
3382             vmdh_index,
3383             &my_smsg_mdh_mailbox);
3384     register_memory_size += smsg_memlen*(mysize);
3385     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3386
3387     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3388     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3389
3390     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3391     base_infor.mdh =  my_smsg_mdh_mailbox;
3392     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3393
3394     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3395  
3396     for(i=0; i<mysize; i++)
3397     {
3398         if(i==myrank)
3399             continue;
3400         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3401         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3402         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3403         smsg_attr[i].mbox_offset = i*smsg_memlen;
3404         smsg_attr[i].buff_size = smsg_memlen;
3405         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3406         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3407     }
3408
3409     for(i=0; i<mysize; i++)
3410     {
3411         if (myrank == i) continue;
3412
3413         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3414         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3415         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3416         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3417         remote_smsg_attr.buff_size = smsg_memlen;
3418         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3419         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3420
3421         /* initialize the smsg channel */
3422         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3423         GNI_RC_CHECK("SMSG Init", status);
3424     } //end initialization
3425
3426     free(base_addr_vec);
3427     free(smsg_attr);
3428
3429     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3430     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3431
3432
3433 inline
3434 static void _init_send_queue(SMSG_QUEUE *queue)
3435 {
3436      int i;
3437 #if ONE_SEND_QUEUE
3438      queue->sendMsgBuf = PCQueueCreate();
3439      destpe_avail = (char*)malloc(mysize * sizeof(char));
3440 #else
3441      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3442 #if CMK_SMP && SMP_LOCKS
3443      nonEmptyQueues = PCQueueCreate();
3444 #endif
3445      for(i =0; i<mysize; i++)
3446      {
3447 #if CMK_SMP
3448          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3449 #if SMP_LOCKS
3450          queue->smsg_msglist_index[i].pushed = 0;
3451          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3452 #endif
3453 #else
3454          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
3455          queue->smsg_msglist_index[i].next = -1;
3456          queue->smsg_head_index = -1;
3457 #endif
3458         
3459      }
3460 #endif
3461 }
3462
3463 inline
3464 static void _init_smsg()
3465 {
3466     if(mysize > 1) {
3467         if (useDynamicSMSG)
3468             _init_dynamic_smsg();
3469         else
3470             _init_static_smsg();
3471     }
3472
3473     _init_send_queue(&smsg_queue);