make hugepage as default for uGNI
[charm.git] / src / arch / gni / machine.c
1
2 /** @file
3  * GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #define DIRECT_SEQ 0xFFFFFFE 
48 #include "cmidirect.h"
49 #endif
50
51 #if REGULARPAGE 
52 #define     LARGEPAGE              0
53 #else
54 #define     LARGEPAGE              1
55 #endif
56
57 #if CMK_SMP
58 #define MULTI_THREAD_SEND          0
59 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
60 #endif
61
62 #if MULTI_THREAD_SEND
63 #define CMK_WORKER_SINGLE_TASK     1
64 #endif
65
66 #define REMOTE_EVENT               1
67 #define CQWRITE                    0
68
69 #define CMI_EXERT_SEND_LARGE_CAP   0
70 #define CMI_EXERT_RECV_RDMA_CAP    0
71
72
73 #define CMI_SENDBUFFERSMSG_CAP            0
74 #define CMI_PUMPNETWORKSMSG_CAP           0
75 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
76 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
77
78 #if CMI_SENDBUFFERSMSG_CAP
79 int     SendBufferMsg_cap  = 20;
80 #endif
81
82 #if CMI_PUMPNETWORKSMSG_CAP
83 int     PumpNetworkSmsg_cap = 20;
84 #endif
85
86 #if CMI_PUMPREMOTETRANSACTIONS_CAP
87 int     PumpRemoteTransactions_cap = 20;
88 #endif
89
90 #if CMI_PUMPREMOTETRANSACTIONS_CAP
91 int     PumpLocalTransactions_cap = 15;
92 #endif
93
94 #if CMI_EXERT_SEND_LARGE_CAP
95 static int SEND_large_cap = 20;
96 static int SEND_large_pending = 0;
97 #endif
98
99 #if CMI_EXERT_RECV_RDMA_CAP
100 static int   RDMA_cap =   10;
101 static int   RDMA_pending = 0;
102 #endif
103
104 #define USE_LRTS_MEMPOOL                  1
105
106 #define PRINT_SYH                         0
107
108 // Trace communication thread
109 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
110 #define TRACE_THRESHOLD     0.00001
111 #define CMI_MPI_TRACE_MOREDETAILED 0
112 #undef CMI_MPI_TRACE_USEREVENTS
113 #define CMI_MPI_TRACE_USEREVENTS 1
114 #else
115 #undef CMK_SMP_TRACE_COMMTHREAD
116 #define CMK_SMP_TRACE_COMMTHREAD 0
117 #endif
118
119 #define CMK_TRACE_COMMOVERHEAD 0
120 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
121 #undef CMI_MPI_TRACE_USEREVENTS
122 #define CMI_MPI_TRACE_USEREVENTS 1
123 #else
124 #undef CMK_TRACE_COMMOVERHEAD
125 #define CMK_TRACE_COMMOVERHEAD 0
126 #endif
127
128 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
129 CpvStaticDeclare(double, projTraceStart);
130 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
131 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
132 #define  EVENT_TIME()   CpvAccess(projTraceStart)
133 #else
134 #define  START_EVENT()
135 #define  END_EVENT(x)
136 #define  EVENT_TIME()   (0.0)
137 #endif
138
139 #if USE_LRTS_MEMPOOL
140
141 #define oneMB (1024ll*1024)
142 #define oneGB (1024ll*1024*1024)
143
144 static CmiInt8 _mempool_size = 8*oneMB;
145 static CmiInt8 _expand_mem =  4*oneMB;
146 static CmiInt8 _mempool_size_limit = 0;
147
148 static CmiInt8 _totalmem = 0.8*oneGB;
149
150 #if LARGEPAGE
151 static CmiInt8 BIG_MSG  =  16*oneMB;
152 static CmiInt8 ONE_SEG  =  4*oneMB;
153 #else
154 static CmiInt8 BIG_MSG  =  4*oneMB;
155 static CmiInt8 ONE_SEG  =  2*oneMB;
156 #endif
157 #if MULTI_THREAD_SEND
158 static int BIG_MSG_PIPELINE = 1;
159 #else
160 static int BIG_MSG_PIPELINE = 4;
161 #endif
162
163 // dynamic flow control
164 static CmiInt8 buffered_send_msg = 0;
165 static CmiInt8 register_memory_size = 0;
166
167 #if LARGEPAGE
168 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
169 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
170 static CmiInt8  register_count = 0;
171 #else
172 #if CMK_SMP && COMM_THREAD_SEND 
173 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
174 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
175 #else
176 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
177 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
178 #endif
179
180
181 #endif
182
183 #endif     /* end USE_LRTS_MEMPOOL */
184
185 #if MULTI_THREAD_SEND 
186 #define     CMI_GNI_LOCK(x)       CmiLock(x);
187 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
188 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
189 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
190 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
191 #else
192 #define     CMI_GNI_LOCK(x)
193 #define     CMI_GNI_TRYLOCK(x)         (0)
194 #define     CMI_GNI_UNLOCK(x)
195 #define     CMI_PCQUEUEPOP_LOCK(Q)   
196 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
197 #endif
198
199 static int _tlbpagesize = 4096;
200
201 //static int _smpd_count  = 0;
202
203 static int   user_set_flag  = 0;
204
205 static int _checkProgress = 1;             /* check deadlock */
206 static int _detected_hang = 0;
207
208 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
209
210 // dynamic SMSG
211 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
212
213 static int avg_smsg_connection = 32;
214 static int                 *smsg_connected_flag= 0;
215 static gni_smsg_attr_t     **smsg_attr_vector_local;
216 static gni_smsg_attr_t     **smsg_attr_vector_remote;
217 static gni_ep_handle_t     ep_hndl_unbound;
218 static gni_smsg_attr_t     send_smsg_attr;
219 static gni_smsg_attr_t     recv_smsg_attr;
220
221 typedef struct _dynamic_smsg_mailbox{
222    void     *mailbox_base;
223    int      size;
224    int      offset;
225    gni_mem_handle_t  mem_hndl;
226    struct      _dynamic_smsg_mailbox  *next;
227 }dynamic_smsg_mailbox_t;
228
229 static dynamic_smsg_mailbox_t  *mailbox_list;
230
231 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
232 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
233
234 #if PRINT_SYH
235 int         lrts_send_msg_id = 0;
236 int         lrts_local_done_msg = 0;
237 int         lrts_send_rdma_success = 0;
238 #endif
239
240 #include "machine.h"
241
242 #include "pcqueue.h"
243
244 #include "mempool.h"
245
246 #if CMK_PERSISTENT_COMM
247 #define PERSISTENT_GET_BASE 0 
248 #if !PERSISTENT_GET_BASE
249 #define CMK_PERSISTENT_COMM_PUT 1 
250 #endif
251 #include "machine-persistent.h"
252 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
253 #else  
254 #define  POST_HIGHPRIORITY_RDMA   
255 #endif
256
257 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT) 
258 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
259 #else
260 #define  PUMP_REMOTE_HIGHPRIORITY
261 #endif
262
263 //#define  USE_ONESIDED 1
264 #ifdef USE_ONESIDED
265 //onesided implementation is wrong, since no place to restore omdh
266 #include "onesided.h"
267 onesided_hnd_t   onesided_hnd;
268 onesided_md_t    omdh;
269 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
270
271 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
272
273 #else
274 uint8_t   onesided_hnd, omdh;
275
276 #if REMOTE_EVENT || CQWRITE 
277 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
278     if(register_memory_size+size>= MAX_REG_MEM) { \
279         status = GNI_RC_ERROR_NOMEM;} \
280     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
281         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
282 #else
283 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
284         if (register_memory_size + size >= MAX_REG_MEM) { \
285             status = GNI_RC_ERROR_NOMEM; \
286         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
287             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
288 #endif
289
290 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
291     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
292              register_memory_size -= size; \
293          else CmiAbort("MEM_DEregister");  \
294     } while (0)
295 #endif
296
297 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
298 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
299 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
300 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
301 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
302 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
303 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
306 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
307 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
308 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
309 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
310 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
311
312 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
313 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
314
315 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
316 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
317 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
318 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
319
320 #define ALIGNBUF                64
321
322 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
323 /* If SMSG is not used */
324
325 #define FMA_PER_CORE  1024
326 #define FMA_BUFFER_SIZE 1024
327
328 /* If SMSG is used */
329 static int  SMSG_MAX_MSG = 1024;
330 #define SMSG_MAX_CREDIT    72
331
332 #define MSGQ_MAXSIZE       2048
333
334 /* large message transfer with FMA or BTE */
335 #if ! REMOTE_EVENT
336 #define LRTS_GNI_RDMA_THRESHOLD  1024 
337 #else
338    /* remote events only work with RDMA */
339 #define LRTS_GNI_RDMA_THRESHOLD  0 
340 #endif
341
342 #if CMK_SMP
343 static int  REMOTE_QUEUE_ENTRIES=163840; 
344 static int LOCAL_QUEUE_ENTRIES=163840; 
345 #else
346 static int  REMOTE_QUEUE_ENTRIES=20480;
347 static int LOCAL_QUEUE_ENTRIES=20480; 
348 #endif
349
350 #define BIG_MSG_TAG             0x26
351 #define PUT_DONE_TAG            0x28
352 #define DIRECT_PUT_DONE_TAG     0x29
353 #define ACK_TAG                 0x30
354 /* SMSG is data message */
355 #define SMALL_DATA_TAG          0x31
356 /* SMSG is a control message to initialize a BTE */
357 #define LMSG_INIT_TAG           0x33 
358 #define LMSG_PERSISTENT_INIT_TAG           0x34 
359 #define LMSG_OOB_INIT_TAG       0x35
360
361 #define DEBUG
362 #ifdef GNI_RC_CHECK
363 #undef GNI_RC_CHECK
364 #endif
365 #ifdef DEBUG
366 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
367 #else
368 #define GNI_RC_CHECK(msg,rc)
369 #endif
370
371 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
372 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
373 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
374
375 static int useStaticMSGQ = 0;
376 static int useStaticFMA = 0;
377 static int mysize, myrank;
378 static gni_nic_handle_t   nic_hndl;
379
380 typedef struct {
381     gni_mem_handle_t mdh;
382     uint64_t addr;
383 } mdh_addr_t ;
384 // this is related to dynamic SMSG
385
386 typedef struct mdh_addr_list{
387     gni_mem_handle_t mdh;
388    void *addr;
389     struct mdh_addr_list *next;
390 }mdh_addr_list_t;
391
392 static unsigned int         smsg_memlen;
393 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
394 mdh_addr_t          setup_mem;
395 mdh_addr_t          *smsg_connection_vec = 0;
396 gni_mem_handle_t    smsg_connection_memhndl;
397 static int          smsg_expand_slots = 10;
398 static int          smsg_available_slot = 0;
399 static void         *smsg_mailbox_mempool = 0;
400 mdh_addr_list_t     *smsg_dynamic_list = 0;
401
402 static void             *smsg_mailbox_base;
403 gni_msgq_attr_t         msgq_attrs;
404 gni_msgq_handle_t       msgq_handle;
405 gni_msgq_ep_attr_t      msgq_ep_attrs;
406 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
407
408 /* =====Beginning of Declarations of Machine Specific Variables===== */
409 static int cookie;
410 static int modes = 0;
411 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
412 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
413 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
414 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
415 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
416 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
417 static gni_ep_handle_t       *ep_hndl_array;
418
419 static CmiNodeLock           *ep_lock_array;
420 static CmiNodeLock           default_tx_cq_lock; 
421 static CmiNodeLock           rdma_tx_cq_lock; 
422 static CmiNodeLock           global_gni_lock; 
423 static CmiNodeLock           rx_cq_lock;
424 static CmiNodeLock           smsg_mailbox_lock;
425 static CmiNodeLock           smsg_rx_cq_lock;
426 static CmiNodeLock           *mempool_lock;
427 //#define     CMK_WITH_STATS      1
428 typedef struct msg_list
429 {
430     uint32_t destNode;
431     uint32_t size;
432     void *msg;
433     uint8_t tag;
434 #if CMK_WITH_STATS
435     double  creation_time;
436 #endif
437 }MSG_LIST;
438
439
440 typedef struct control_msg
441 {
442     uint64_t            source_addr;    /* address from the start of buffer  */
443     uint64_t            dest_addr;      /* address from the start of buffer */
444     int                 total_length;   /* total length */
445     int                 length;         /* length of this packet */
446 #if REMOTE_EVENT
447     int                 ack_index;      /* index from integer to address */
448 #endif
449     int     seq_id;         //big message   0 meaning single message
450     gni_mem_handle_t    source_mem_hndl;
451 #if PERSISTENT_GET_BASE
452     gni_mem_handle_t    dest_mem_hndl;
453 #endif
454     struct control_msg  *next;
455 } CONTROL_MSG;
456
457 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
458
459 typedef struct ack_msg
460 {
461     uint64_t            source_addr;    /* address from the start of buffer  */
462 #if ! USE_LRTS_MEMPOOL
463     gni_mem_handle_t    source_mem_hndl;
464     int                 length;          /* total length */
465 #endif
466     struct ack_msg     *next;
467 } ACK_MSG;
468
469 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
470
471 #if CMK_DIRECT
472 typedef struct{
473     uint64_t    handler_addr;
474 }CMK_DIRECT_HEADER;
475
476 typedef struct {
477     char core[CmiMsgHeaderSizeBytes];
478     uint64_t handler;
479 }cmidirectMsg;
480
481 //SYH
482 CpvDeclare(int, CmiHandleDirectIdx);
483 void CmiHandleDirectMsg(cmidirectMsg* msg)
484 {
485
486     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
487    (*(_handle->callbackFnPtr))(_handle->callbackData);
488    CmiFree(msg);
489 }
490
491 void CmiDirectInit()
492 {
493     CpvInitialize(int,  CmiHandleDirectIdx);
494     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
495 }
496
497 #endif
498 typedef struct  rmda_msg
499 {
500     int                   destNode;
501 #if REMOTE_EVENT
502     int                   ack_index;
503 #endif
504     gni_post_descriptor_t *pd;
505 }RDMA_REQUEST;
506
507
508 #define SMP_LOCKS                       1
509 #define ONE_SEND_QUEUE                  0
510 typedef PCQueue BufferList;
511 typedef struct  msg_list_index
512 {
513     PCQueue       sendSmsgBuf;
514 #if  SMP_LOCKS
515     CmiNodeLock   lock;
516     int           pushed;
517     int           destpe;
518 #endif
519 } MSG_LIST_INDEX;
520 char                *destpe_avail;
521 PCQueue sendRdmaBuf;
522 PCQueue sendHighPriorBuf;
523 // buffered send queue
524 #if ! ONE_SEND_QUEUE
525 typedef struct smsg_queue
526 {
527     MSG_LIST_INDEX   *smsg_msglist_index;
528     int               smsg_head_index;
529 #if  SMP_LOCKS
530     PCQueue     nonEmptyQueues;
531 #endif
532 } SMSG_QUEUE;
533 #else
534 typedef struct smsg_queue
535 {
536     PCQueue       sendMsgBuf;
537 }  SMSG_QUEUE;
538 #endif
539
540 SMSG_QUEUE                  smsg_queue;
541 #if CMK_USE_OOB
542 SMSG_QUEUE                  smsg_oob_queue;
543 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
544 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
545 #else
546 #define SEND_OOB_SMSG(x)            
547 #define PUMP_LOCAL_HIGHPRIORITY     
548 #endif
549
550 #define FreeMsgList(d)   free(d);
551 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
552
553 #define FreeControlMsg(d)      free(d);
554 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
555
556 #define FreeAckMsg(d)      free(d);
557 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
558
559 #define FreeRdmaRequest(d)       free(d);
560 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
561 /* reuse gni_post_descriptor_t */
562 static gni_post_descriptor_t *post_freelist=0;
563
564 #define FreePostDesc(d)     free(d);
565 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
566
567
568 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
569 static int      buffered_smsg_counter = 0;
570
571 /* SmsgSend return success but message sent is not confirmed by remote side */
572 static MSG_LIST *buffered_fma_head = 0;
573 static MSG_LIST *buffered_fma_tail = 0;
574
575 /* functions  */
576 #define IsFree(a,ind)  !( a& (1<<(ind) ))
577 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
578 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
579
580 CpvDeclare(mempool_type*, mempool);
581
582 #if CMK_PERSISTENT_COMM_PUT
583 CpvDeclare(mempool_type*, persistent_mempool);
584 #endif
585
586 #if REMOTE_EVENT
587 /* ack pool for remote events */
588
589 static int  SHIFT   =           18;
590 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
591 #define RANK_MASK               ((1<<SHIFT) - 1)
592 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
593
594 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
595 #define GET_RANK(evt)           ((evt) & RANK_MASK)
596 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
597
598 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
599 #define DIRECT_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
600
601 #if CMK_SMP
602 #define INIT_SIZE                4096
603 #else
604 #define INIT_SIZE                1024
605 #endif
606
607 struct IndexStruct {
608 void *addr;
609 int next;
610 int type;     // 1: ACK   2: Persistent
611 };
612
613 typedef struct IndexPool {
614     struct IndexStruct   *indexes;
615     int                   size;
616     int                   freehead;
617     CmiNodeLock           lock;
618 } IndexPool;
619
620 static IndexPool  ackPool;
621 #if CMK_PERSISTENT_COMM_PUT
622 static IndexPool  persistPool;
623 #else
624 #define persistPool ackPool 
625 #endif
626
627 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
628 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
629
630 static void IndexPool_init(IndexPool *pool)
631 {
632     int i;
633     if ((1<<SHIFT) < mysize) 
634         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
635     pool->size = INIT_SIZE;
636     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
637     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
638     for (i=0; i<pool->size-1; i++) {
639         pool->indexes[i].next = i+1;
640         pool->indexes[i].type = 0;
641     }
642     pool->indexes[i].next = -1;
643     pool->indexes[i].type = 0;
644     pool->freehead = 0;
645 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
646     pool->lock  = CmiCreateLock();
647 #else
648     pool->lock  = 0;
649 #endif
650 }
651
652 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
653 {
654     int s, i;
655 #if MULTI_THREAD_SEND  
656     CmiLock(pool->lock);
657 #endif
658     CmiAssert(type == 1 || type == 2);
659     s = pool->freehead;
660     if (s == -1) {
661         int newsize = pool->size * 2;
662         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
663         if (newsize > (1<<(32-SHIFT-1))) {
664             static int warned = 0;
665             if (!warned)
666               printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
667             warned = 1;
668             return -1;
669             CmiAbort("IndexPool for remote events overflows, try compile Charm++ with remote event disabled.");
670         }
671         struct IndexStruct *old_ackpool = pool->indexes;
672         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
673         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
674         for (i=pool->size; i<newsize-1; i++) {
675             pool->indexes[i].next = i+1;
676             pool->indexes[i].type = 0;
677         }
678         pool->indexes[i].next = -1;
679         pool->indexes[i].type = 0;
680         pool->freehead = pool->size;
681         s = pool->size;
682         pool->size = newsize;
683         free(old_ackpool);
684     }
685     pool->freehead = pool->indexes[s].next;
686     pool->indexes[s].addr = addr;
687     CmiAssert(pool->indexes[s].type == 0);
688     pool->indexes[s].type = type;
689 #if MULTI_THREAD_SEND
690     CmiUnlock(pool->lock);
691 #endif
692     return s;
693 }
694
695 static void IndexPool_freeslot(IndexPool *pool, int s)
696 {
697     CmiAssert(s>=0 && s<pool->size);
698 #if MULTI_THREAD_SEND
699     CmiLock(pool->lock);
700 #endif
701     pool->indexes[s].next = pool->freehead;
702     pool->indexes[s].type = 0;
703     pool->freehead = s;
704 #if MULTI_THREAD_SEND
705     CmiUnlock(pool->lock);
706 #endif
707 }
708
709
710 #endif
711
712 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
713 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
714 #define CHARM_MAGIC_NUMBER               126
715
716 #if CMK_ERROR_CHECKING
717 extern unsigned char computeCheckSum(unsigned char *data, int len);
718 static int checksum_flag = 0;
719 #define CMI_SET_CHECKSUM(msg, len)      \
720         if (checksum_flag)  {   \
721           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
722           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
723         }
724 #define CMI_CHECK_CHECKSUM(msg, len)    \
725         if (checksum_flag)      \
726           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
727             CmiAbort("Fatal error: checksum doesn't agree!\n");
728 #else
729 #define CMI_SET_CHECKSUM(msg, len)
730 #define CMI_CHECK_CHECKSUM(msg, len)
731 #endif
732 /* =====End of Definitions of Message-Corruption Related Macros=====*/
733
734 static int print_stats = 0;
735 static int stats_off = 0;
736 void CmiTurnOnStats()
737 {
738     stats_off = 0;
739     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
740 }
741
742 void CmiTurnOffStats()
743 {
744     stats_off = 1;
745 }
746
747 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
748
749 #if CMK_WITH_STATS
750 FILE *counterLog = NULL;
751 typedef struct comm_thread_stats
752 {
753     uint64_t  smsg_data_count;
754     uint64_t  lmsg_init_count;
755     uint64_t  ack_count;
756     uint64_t  big_msg_ack_count;
757     uint64_t  smsg_count;
758     uint64_t  direct_put_done_count;
759     uint64_t  put_done_count;
760     //times of calling SmsgSend
761     uint64_t  try_smsg_data_count;
762     uint64_t  try_lmsg_init_count;
763     uint64_t  try_ack_count;
764     uint64_t  try_big_msg_ack_count;
765     uint64_t  try_direct_put_done_count;
766     uint64_t  try_put_done_count;
767     uint64_t  try_smsg_count;
768     
769     double    max_time_in_send_buffered_smsg;
770     double    all_time_in_send_buffered_smsg;
771
772     uint64_t  rdma_get_count, rdma_put_count;
773     uint64_t  try_rdma_get_count, try_rdma_put_count;
774     double    max_time_from_control_to_rdma_init;
775     double    all_time_from_control_to_rdma_init;
776
777     double    max_time_from_rdma_init_to_rdma_done;
778     double    all_time_from_rdma_init_to_rdma_done;
779
780     int      count_in_PumpNetwork;
781     double   time_in_PumpNetwork;
782     double   max_time_in_PumpNetwork;
783     int      count_in_SendBufferMsg_smsg;
784     double   time_in_SendBufferMsg_smsg;
785     double   max_time_in_SendBufferMsg_smsg;
786     int      count_in_SendRdmaMsg;
787     double   time_in_SendRdmaMsg;
788     double   max_time_in_SendRdmaMsg;
789     int      count_in_PumpRemoteTransactions;
790     double   time_in_PumpRemoteTransactions;
791     double   max_time_in_PumpRemoteTransactions;
792     int      count_in_PumpLocalTransactions_rdma;
793     double   time_in_PumpLocalTransactions_rdma;
794     double   max_time_in_PumpLocalTransactions_rdma;
795     int      count_in_PumpDatagramConnection;
796     double   time_in_PumpDatagramConnection;
797     double   max_time_in_PumpDatagramConnection;
798 } Comm_Thread_Stats;
799
800 static Comm_Thread_Stats   comm_stats;
801
802 static char *counters_dirname = "counters";
803
804 static void init_comm_stats()
805 {
806   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
807   if (print_stats){
808       char ln[200];
809       int code = mkdir(counters_dirname, 00777); 
810       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
811       counterLog=fopen(ln,"w");
812       if (counterLog == NULL) CmiAbort("Counter files open failed");
813   }
814 }
815
816 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
817
818 #define SMSG_SENT_DONE(creation_time, tag)  \
819         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
820             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
821             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
822             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
823             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
824             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
825             comm_stats.smsg_count++; \
826             double inbuff_time = CmiWallTimer() - creation_time;   \
827             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
828             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
829         }
830
831 #define SMSG_TRY_SEND(tag)  \
832         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
833             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
834             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
835             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
836             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
837             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
838             comm_stats.try_smsg_count++; \
839         }
840
841 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
842
843 #define  RDMA_TRANS_DONE(x)      \
844          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
845              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
846              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
847          }
848
849 #define  RDMA_TRANS_INIT(type, x)      \
850          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
851              double rdma_trans_time = CmiWallTimer() - x ; \
852              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
853              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
854          }
855
856 #define STATS_PUMPNETWORK_TIME(x)   \
857         { double t = CmiWallTimer(); \
858           x;        \
859           t = CmiWallTimer() - t;          \
860           comm_stats.count_in_PumpNetwork++;        \
861           comm_stats.time_in_PumpNetwork += t;   \
862           if (t>comm_stats.max_time_in_PumpNetwork)      \
863               comm_stats.max_time_in_PumpNetwork = t;    \
864         }
865
866 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
867         { double t = CmiWallTimer(); \
868           x;        \
869           t = CmiWallTimer() - t;          \
870           comm_stats.count_in_PumpRemoteTransactions ++;        \
871           comm_stats.time_in_PumpRemoteTransactions += t;   \
872           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
873               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
874         }
875
876 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
877         { double t = CmiWallTimer(); \
878           x;        \
879           t = CmiWallTimer() - t;          \
880           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
881           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
882           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
883               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
884         }
885
886 #define STATS_SEND_SMSGS_TIME(x)   \
887         { double t = CmiWallTimer(); \
888           x;        \
889           t = CmiWallTimer() - t;          \
890           comm_stats.count_in_SendBufferMsg_smsg ++;        \
891           comm_stats.time_in_SendBufferMsg_smsg += t;   \
892           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
893               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
894         }
895
896 #define STATS_SENDRDMAMSG_TIME(x)   \
897         { double t = CmiWallTimer(); \
898           x;        \
899           t = CmiWallTimer() - t;          \
900           comm_stats.count_in_SendRdmaMsg ++;        \
901           comm_stats.time_in_SendRdmaMsg += t;   \
902           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
903               comm_stats.max_time_in_SendRdmaMsg = t;    \
904         }
905
906 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
907         { double t = CmiWallTimer(); \
908           x;        \
909           t = CmiWallTimer() - t;          \
910           comm_stats.count_in_PumpDatagramConnection ++;        \
911           comm_stats.time_in_PumpDatagramConnection += t;   \
912           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
913               comm_stats.max_time_in_PumpDatagramConnection = t;    \
914         }
915
916 static void print_comm_stats()
917 {
918     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
919     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
920             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
921             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
922     
923     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
924             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
925             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
926
927     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
928     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
929     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
930
931
932     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
933     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
934     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
935     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
936     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
937     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
938     if (useDynamicSMSG)
939     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
940
941     fclose(counterLog);
942 }
943
944 #else
945 #define STATS_PUMPNETWORK_TIME(x)                  x
946 #define STATS_SEND_SMSGS_TIME(x)                   x
947 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
948 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
949 #define STATS_SENDRDMAMSG_TIME(x)                  x
950 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
951 #endif
952
953 static void
954 allgather(void *in,void *out, int len)
955 {
956     static int *ivec_ptr=NULL,already_called=0,job_size=0;
957     int i,rc;
958     int my_rank;
959     char *tmp_buf,*out_ptr;
960
961     if(!already_called) {
962
963         rc = PMI_Get_size(&job_size);
964         CmiAssert(rc == PMI_SUCCESS);
965         rc = PMI_Get_rank(&my_rank);
966         CmiAssert(rc == PMI_SUCCESS);
967
968         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
969         CmiAssert(ivec_ptr != NULL);
970
971         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
972         CmiAssert(rc == PMI_SUCCESS);
973
974         already_called = 1;
975
976     }
977
978     tmp_buf = (char *)malloc(job_size * len);
979     CmiAssert(tmp_buf);
980
981     rc = PMI_Allgather(in,tmp_buf,len);
982     CmiAssert(rc == PMI_SUCCESS);
983
984     out_ptr = out;
985
986     for(i=0;i<job_size;i++) {
987
988         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
989
990     }
991
992     free(tmp_buf);
993 }
994
995 static void
996 allgather_2(void *in,void *out, int len)
997 {
998     //PMI_Allgather is out of order
999     int i,rc, extend_len;
1000     int  rank_index;
1001     char *out_ptr, *out_ref;
1002     char *in2;
1003
1004     extend_len = sizeof(int) + len;
1005     in2 = (char*)malloc(extend_len);
1006
1007     memcpy(in2, &myrank, sizeof(int));
1008     memcpy(in2+sizeof(int), in, len);
1009
1010     out_ptr = (char*)malloc(mysize*extend_len);
1011
1012     rc = PMI_Allgather(in2, out_ptr, extend_len);
1013     GNI_RC_CHECK("allgather", rc);
1014
1015     out_ref = out;
1016
1017     for(i=0;i<mysize;i++) {
1018         //rank index 
1019         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1020         //copy to the rank index slot
1021         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1022     }
1023
1024     free(out_ptr);
1025     free(in2);
1026
1027 }
1028
1029 static unsigned int get_gni_nic_address(int device_id)
1030 {
1031     unsigned int address, cpu_id;
1032     gni_return_t status;
1033     int i, alps_dev_id=-1,alps_address=-1;
1034     char *token, *p_ptr;
1035
1036     p_ptr = getenv("PMI_GNI_DEV_ID");
1037     if (!p_ptr) {
1038         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1039        
1040         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1041     } else {
1042         while ((token = strtok(p_ptr,":")) != NULL) {
1043             alps_dev_id = atoi(token);
1044             if (alps_dev_id == device_id) {
1045                 break;
1046             }
1047             p_ptr = NULL;
1048         }
1049         CmiAssert(alps_dev_id != -1);
1050         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1051         CmiAssert(p_ptr != NULL);
1052         i = 0;
1053         while ((token = strtok(p_ptr,":")) != NULL) {
1054             if (i == alps_dev_id) {
1055                 alps_address = atoi(token);
1056                 break;
1057             }
1058             p_ptr = NULL;
1059             ++i;
1060         }
1061         CmiAssert(alps_address != -1);
1062         address = alps_address;
1063     }
1064     return address;
1065 }
1066
1067 static uint8_t get_ptag(void)
1068 {
1069     char *p_ptr, *token;
1070     uint8_t ptag;
1071
1072     p_ptr = getenv("PMI_GNI_PTAG");
1073     CmiAssert(p_ptr != NULL);
1074     token = strtok(p_ptr, ":");
1075     ptag = (uint8_t)atoi(token);
1076     return ptag;
1077         
1078 }
1079
1080 static uint32_t get_cookie(void)
1081 {
1082     uint32_t cookie;
1083     char *p_ptr, *token;
1084
1085     p_ptr = getenv("PMI_GNI_COOKIE");
1086     CmiAssert(p_ptr != NULL);
1087     token = strtok(p_ptr, ":");
1088     cookie = (uint32_t)atoi(token);
1089
1090     return cookie;
1091 }
1092
1093 #if LARGEPAGE
1094
1095 /* directly mmap memory from hugetlbfs for large pages */
1096
1097 #include <sys/stat.h>
1098 #include <fcntl.h>
1099 #include <sys/mman.h>
1100 #include <hugetlbfs.h>
1101
1102 // size must be _tlbpagesize aligned
1103 void *my_get_huge_pages(size_t size)
1104 {
1105     char filename[512];
1106     int fd;
1107     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1108     void *ptr = NULL;
1109
1110     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1111     fd = open(filename, O_RDWR | O_CREAT, mode);
1112     if (fd == -1) {
1113         CmiAbort("my_get_huge_pages: open filed");
1114     }
1115     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1116     if (ptr == MAP_FAILED) ptr = NULL;
1117 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1118     close(fd);
1119     unlink(filename);
1120     return ptr;
1121 }
1122
1123 void my_free_huge_pages(void *ptr, int size)
1124 {
1125 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1126     int ret = munmap(ptr, size);
1127     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1128 }
1129
1130 #endif
1131
1132 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1133 /* TODO: add any that are related */
1134 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1135
1136
1137 #include "machine-lrts.h"
1138 #include "machine-common-core.c"
1139
1140
1141 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1142 static void SendRdmaMsg(PCQueue );
1143 static void PumpNetworkSmsg();
1144 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1145 #if CQWRITE
1146 static void PumpCqWriteTransactions();
1147 #endif
1148 #if REMOTE_EVENT
1149 static void PumpRemoteTransactions(gni_cq_handle_t);
1150 #endif
1151
1152 #if MACHINE_DEBUG_LOG
1153 static CmiInt8 buffered_recv_msg = 0;
1154 int         lrts_smsg_success = 0;
1155 int         lrts_received_msg = 0;
1156 #endif
1157
1158 static void sweep_mempool(mempool_type *mptr)
1159 {
1160     int n = 0;
1161     block_header *current = &(mptr->block_head);
1162
1163     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1164     while( current!= NULL) {
1165         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1166         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1167     }
1168     printf("[n %d] sweep_mempool slot END.\n", myrank);
1169 }
1170
1171
1172 static INLINE_KEYWORD  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1173 {
1174     block_header *current = *from;
1175
1176     //while(register_memory_size>= MAX_REG_MEM)
1177     //{
1178         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1179             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1180
1181         *from = current;
1182         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1183         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1184         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1185     //}
1186     return GNI_RC_SUCCESS;
1187 }
1188
1189 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1190 {
1191     gni_return_t status = GNI_RC_SUCCESS;
1192     //int size = GetMempoolsize(msg);
1193     //void *blockaddr = GetMempoolBlockPtr(msg);
1194     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1195    
1196     block_header *current = &(mptr->block_head);
1197     while(register_memory_size>= MAX_REG_MEM)
1198     {
1199         status = deregisterMemory(mptr, &current);
1200         if (status != GNI_RC_SUCCESS) break;
1201     }
1202     if(register_memory_size>= MAX_REG_MEM) return status;
1203
1204     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1205     while(1)
1206     {
1207         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1208         if(status == GNI_RC_SUCCESS)
1209         {
1210             break;
1211         }
1212         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1213         {
1214             GNI_RC_CHECK("registerFromMempool", status);
1215         }
1216         else
1217         {
1218             status = deregisterMemory(mptr, &current);
1219             if (status != GNI_RC_SUCCESS) break;
1220         }
1221     }; 
1222     return status;
1223 }
1224
1225 INLINE_KEYWORD
1226 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1227 {
1228     static int rank = -1;
1229     int i;
1230     gni_return_t status;
1231     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1232     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1233     mempool_type *mptr;
1234
1235     status = registerFromMempool(mptr1, msg, size, t, cqh);
1236     if (status == GNI_RC_SUCCESS) return status;
1237 #if CMK_SMP 
1238     for (i=0; i<CmiMyNodeSize()+1; i++) {
1239       rank = (rank+1)%(CmiMyNodeSize()+1);
1240       mptr = CpvAccessOther(mempool, rank);
1241       if (mptr == mptr1) continue;
1242       status = registerFromMempool(mptr, msg, size, t, cqh);
1243       if (status == GNI_RC_SUCCESS) return status;
1244     }
1245 #endif
1246     return  GNI_RC_ERROR_RESOURCE;
1247 }
1248
1249 INLINE_KEYWORD
1250 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1251 {
1252     MSG_LIST        *msg_tmp;
1253     MallocMsgList(msg_tmp);
1254     msg_tmp->destNode = destNode;
1255     msg_tmp->size   = size;
1256     msg_tmp->msg    = msg;
1257     msg_tmp->tag    = tag;
1258 #if CMK_WITH_STATS
1259     SMSG_CREATION(msg_tmp)
1260 #endif
1261
1262 #if ONE_SEND_QUEUE
1263     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1264 #else
1265 #if SMP_LOCKS
1266     CmiLock(queue->smsg_msglist_index[destNode].lock);
1267     if(queue->smsg_msglist_index[destNode].pushed == 0)
1268     {
1269         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1270     }
1271     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1272     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1273 #else
1274     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1275 #endif
1276 #endif
1277
1278 #if PRINT_SYH
1279     buffered_smsg_counter++;
1280 #endif
1281 }
1282
1283 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t     *a)
1284 {
1285     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1286 }
1287 #if 0
1288 INLINE_KEYWORD
1289 static void setup_smsg_connection(int destNode)
1290 {
1291     mdh_addr_list_t  *new_entry = 0;
1292     gni_post_descriptor_t *pd;
1293     gni_smsg_attr_t      *smsg_attr;
1294     gni_return_t status = GNI_RC_NOT_DONE;
1295     RDMA_REQUEST        *rdma_request_msg;
1296     
1297     if(smsg_available_slot == smsg_expand_slots)
1298     {
1299         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1300         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1301         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1302
1303         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1304             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1305             GNI_MEM_READWRITE,   
1306             -1,
1307             &(new_entry->mdh));
1308         smsg_available_slot = 0; 
1309         new_entry->next = smsg_dynamic_list;
1310         smsg_dynamic_list = new_entry;
1311     }
1312     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1313     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1314     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1315     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1316     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1317     smsg_attr->buff_size = smsg_memlen;
1318     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1319     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1320     smsg_local_attr_vec[destNode] = smsg_attr;
1321     smsg_available_slot++;
1322     MallocPostDesc(pd);
1323     pd->type            = GNI_POST_FMA_PUT;
1324     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1325     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1326     pd->length          = sizeof(gni_smsg_attr_t);
1327     pd->local_addr      = (uint64_t) smsg_attr;
1328     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1329     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1330     pd->src_cq_hndl     = 0;
1331
1332     pd->rdma_mode       = 0;
1333     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1334     print_smsg_attr(smsg_attr);
1335     if(status == GNI_RC_ERROR_RESOURCE )
1336     {
1337         MallocRdmaRequest(rdma_request_msg);
1338         rdma_request_msg->destNode = destNode;
1339         rdma_request_msg->pd = pd;
1340         /* buffer this request */
1341     }
1342 #if PRINT_SYH
1343     if(status != GNI_RC_SUCCESS)
1344        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1345     else
1346         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1347 #endif
1348 }
1349 #endif
1350 /* useDynamicSMSG */
1351 INLINE_KEYWORD
1352 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1353 {
1354     gni_return_t status = GNI_RC_NOT_DONE;
1355
1356     if(mailbox_list->offset == mailbox_list->size)
1357     {
1358         dynamic_smsg_mailbox_t *new_mailbox_entry;
1359         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1360         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1361         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1362         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1363         new_mailbox_entry->offset = 0;
1364         
1365         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1366             new_mailbox_entry->size, smsg_rx_cqh,
1367             GNI_MEM_READWRITE,   
1368             -1,
1369             &(new_mailbox_entry->mem_hndl));
1370
1371         GNI_RC_CHECK("register", status);
1372         new_mailbox_entry->next = mailbox_list;
1373         mailbox_list = new_mailbox_entry;
1374     }
1375     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1376     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1377     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1378     local_smsg_attr->mbox_offset = mailbox_list->offset;
1379     mailbox_list->offset += smsg_memlen;
1380     local_smsg_attr->buff_size = smsg_memlen;
1381     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1382     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1383 }
1384
1385 /* useDynamicSMSG */
1386 INLINE_KEYWORD
1387 static int connect_to(int destNode)
1388 {
1389     gni_return_t status = GNI_RC_NOT_DONE;
1390     CmiAssert(smsg_connected_flag[destNode] == 0);
1391     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1392     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1393     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1394     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1395     
1396     CMI_GNI_LOCK(global_gni_lock)
1397     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1398     CMI_GNI_UNLOCK(global_gni_lock)
1399     if (status == GNI_RC_ERROR_RESOURCE) {
1400       /* possibly destNode is making connection at the same time */
1401       free(smsg_attr_vector_local[destNode]);
1402       smsg_attr_vector_local[destNode] = NULL;
1403       free(smsg_attr_vector_remote[destNode]);
1404       smsg_attr_vector_remote[destNode] = NULL;
1405       mailbox_list->offset -= smsg_memlen;
1406 #if PRINT_SYH
1407     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1408 #endif
1409       return 0;
1410     }
1411     GNI_RC_CHECK("GNI_Post", status);
1412     smsg_connected_flag[destNode] = 1;
1413 #if PRINT_SYH
1414     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1415 #endif
1416     return 1;
1417 }
1418
1419 INLINE_KEYWORD
1420 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1421 {
1422     unsigned int          remote_address;
1423     uint32_t              remote_id;
1424     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1425     gni_smsg_attr_t       *smsg_attr;
1426     gni_post_descriptor_t *pd;
1427     gni_post_state_t      post_state;
1428     char                  *real_data; 
1429
1430     if (useDynamicSMSG) {
1431         switch (smsg_connected_flag[destNode]) {
1432         case 0: 
1433             connect_to(destNode);         /* continue to case 1 */
1434         case 1:                           /* pending connection, do nothing */
1435             status = GNI_RC_NOT_DONE;
1436             if(inbuff ==0)
1437                 buffer_small_msgs(queue, msg, size, destNode, tag);
1438             return status;
1439         }
1440     }
1441 #if ! ONE_SEND_QUEUE
1442     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1443 #endif
1444     {
1445         //CMI_GNI_LOCK(smsg_mailbox_lock)
1446         CMI_GNI_LOCK(default_tx_cq_lock)
1447 #if CMK_SMP_TRACE_COMMTHREAD
1448         int oldpe = -1;
1449         int oldeventid = -1;
1450         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1451         { 
1452             START_EVENT();
1453             if ( tag == SMALL_DATA_TAG)
1454                 real_data = (char*)msg; 
1455             else 
1456                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1457             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1458             TRACE_COMM_SET_COMM_MSGID(real_data);
1459         }
1460 #endif
1461 #if REMOTE_EVENT
1462         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1463             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1464             if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1465             {
1466                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1467                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1468                     status = GNI_RC_NOT_DONE;
1469                     if (inbuff ==0)
1470                         buffer_small_msgs(queue, msg, size, destNode, tag);
1471                     return status;
1472                 }
1473             }
1474         }
1475 #endif
1476 #if     CMK_WITH_STATS
1477         SMSG_TRY_SEND(tag)
1478 #endif
1479 #if CMK_WITH_STATS
1480     double              creation_time;
1481     if (ptr == NULL)
1482         creation_time = CmiWallTimer();
1483     else
1484         creation_time = ptr->creation_time;
1485 #endif
1486
1487     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1488 #if CMK_SMP_TRACE_COMMTHREAD
1489         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1490 #endif
1491         CMI_GNI_UNLOCK(default_tx_cq_lock)
1492         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1493         if(status == GNI_RC_SUCCESS)
1494         {
1495 #if     CMK_WITH_STATS
1496             SMSG_SENT_DONE(creation_time,tag) 
1497 #endif
1498 #if CMK_SMP_TRACE_COMMTHREAD
1499             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1500             { 
1501                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1502             }
1503 #endif
1504         }else
1505             status = GNI_RC_ERROR_RESOURCE;
1506     }
1507     if(status != GNI_RC_SUCCESS && inbuff ==0)
1508         buffer_small_msgs(queue, msg, size, destNode, tag);
1509     return status;
1510 }
1511
1512 INLINE_KEYWORD
1513 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1514 {
1515     /* construct a control message and send */
1516     CONTROL_MSG         *control_msg_tmp;
1517     MallocControlMsg(control_msg_tmp);
1518     control_msg_tmp->source_addr = (uint64_t)msg;
1519     control_msg_tmp->seq_id    = seqno;
1520     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1521 #if REMOTE_EVENT
1522     control_msg_tmp->ack_index    =  -1;
1523 #endif
1524 #if     USE_LRTS_MEMPOOL
1525     if(size < BIG_MSG)
1526     {
1527         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1528     }
1529     else
1530     {
1531         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1532         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1533         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1534     }
1535 #else
1536     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1537 #endif
1538     return control_msg_tmp;
1539 }
1540
1541 #define BLOCKING_SEND_CONTROL    0
1542
1543 // Large message, send control to receiver, receiver register memory and do a GET, 
1544 // return 1 - send no success
1545 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1546 {
1547     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1548     uint32_t            vmdh_index  = -1;
1549     int                 size;
1550     int                 offset = 0;
1551     uint64_t            source_addr;
1552     int                 register_size; 
1553     void                *msg;
1554
1555     size    =   control_msg_tmp->total_length;
1556     source_addr = control_msg_tmp->source_addr;
1557     register_size = control_msg_tmp->length;
1558
1559 #if  USE_LRTS_MEMPOOL
1560     if( control_msg_tmp->seq_id <=0 ){
1561 #if BLOCKING_SEND_CONTROL
1562         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1563             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1564                 LrtsAdvanceCommunication(0);
1565         }
1566 #endif
1567         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1568         {
1569             msg = (void*)source_addr;
1570             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1571             {
1572                 if(!inbuff)
1573                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1574                 return GNI_RC_ERROR_NOMEM;
1575             }
1576             //register the corresponding mempool
1577             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1578             if(status == GNI_RC_SUCCESS)
1579             {
1580                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1581             }
1582         }else
1583         {
1584             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1585             status = GNI_RC_SUCCESS;
1586         }
1587         if(NoMsgInSend(source_addr))
1588             register_size = GetMempoolsize((void*)(source_addr));
1589         else
1590             register_size = 0;
1591     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1592     {
1593         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1594         source_addr += offset;
1595         size = control_msg_tmp->length;
1596 #if BLOCKING_SEND_CONTROL
1597         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1598             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1599                 LrtsAdvanceCommunication(0);
1600         }
1601 #endif
1602         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1603             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1604             {
1605                 if(!inbuff)
1606                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1607                 return GNI_RC_ERROR_NOMEM;
1608             }
1609             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1610             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1611         }
1612         else
1613         {
1614             status = GNI_RC_SUCCESS;
1615         }
1616         register_size = 0;  
1617     }
1618
1619 #if CMI_EXERT_SEND_LARGE_CAP
1620     if(SEND_large_pending >= SEND_large_cap)
1621     {
1622         status = GNI_RC_ERROR_NOMEM;
1623     }
1624 #endif
1625  
1626     if(status == GNI_RC_SUCCESS)
1627     {
1628        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr); 
1629         if(status == GNI_RC_SUCCESS)
1630         {
1631 #if CMI_EXERT_SEND_LARGE_CAP
1632             SEND_large_pending++;
1633 #endif
1634             buffered_send_msg += register_size;
1635             if(control_msg_tmp->seq_id == 0)
1636             {
1637                 IncreaseMsgInSend(source_addr);
1638             }
1639             FreeControlMsg(control_msg_tmp);
1640             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1641         }else
1642             status = GNI_RC_ERROR_RESOURCE;
1643
1644     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1645     {
1646         CmiAbort("Memory registor for large msg\n");
1647     }else 
1648     {
1649         status = GNI_RC_ERROR_NOMEM; 
1650         if(!inbuff)
1651             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1652     }
1653     return status;
1654 #else
1655     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1656     if(status == GNI_RC_SUCCESS)
1657     {
1658         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);  
1659         if(status == GNI_RC_SUCCESS)
1660         {
1661             FreeControlMsg(control_msg_tmp);
1662         }
1663     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1664     {
1665         CmiAbort("Memory registor for large msg\n");
1666     }else 
1667     {
1668         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1669     }
1670     return status;
1671 #endif
1672 }
1673
1674 INLINE_KEYWORD void LrtsBeginIdle() {}
1675
1676 INLINE_KEYWORD void LrtsStillIdle() {}
1677
1678 INLINE_KEYWORD void LrtsNotifyIdle() {}
1679
1680 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1681 {
1682     CmiSetMsgSize(msg, size);
1683     CMI_SET_CHECKSUM(msg, size);
1684 }
1685
1686 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1687 {
1688     gni_return_t        status  =   GNI_RC_SUCCESS;
1689     uint8_t tag;
1690     CONTROL_MSG         *control_msg_tmp;
1691     int                 oob = ( mode & OUT_OF_BAND);
1692     SMSG_QUEUE          *queue;
1693
1694     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1695 #if CMK_USE_OOB
1696     queue = oob? &smsg_oob_queue : &smsg_queue;
1697     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1698 #else
1699     queue = &smsg_queue;
1700     tag = LMSG_INIT_TAG;
1701 #endif
1702
1703     LrtsPrepareEnvelope(msg, size);
1704
1705 #if PRINT_SYH
1706     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1707 #endif 
1708
1709 #if CMK_SMP 
1710     if(size <= SMSG_MAX_MSG)
1711         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1712     else if (size < BIG_MSG) {
1713         control_msg_tmp =  construct_control_msg(size, msg, 0);
1714         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1715     }
1716     else {
1717         CmiSetMsgSeq(msg, 0);
1718         control_msg_tmp =  construct_control_msg(size, msg, 1);
1719         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1720     }
1721 #else   //non-smp, smp(worker sending)
1722     if(size <= SMSG_MAX_MSG)
1723     {
1724         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1725             CmiFree(msg);
1726     }
1727     else if (size < BIG_MSG) {
1728         control_msg_tmp =  construct_control_msg(size, msg, 0);
1729         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1730     }
1731     else {
1732 #if     USE_LRTS_MEMPOOL
1733         CmiSetMsgSeq(msg, 0);
1734         control_msg_tmp =  construct_control_msg(size, msg, 1);
1735         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1736 #else
1737         control_msg_tmp =  construct_control_msg(size, msg, 0);
1738         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1739 #endif
1740     }
1741 #endif
1742     return 0;
1743 }
1744
1745 #if 0
1746 // this is no different from the common code
1747 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1748 {
1749   int i;
1750 #if CMK_BROADCAST_USE_CMIREFERENCE
1751   for(i=0;i<npes;i++) {
1752     if (pes[i] == CmiMyPe())
1753       CmiSyncSend(pes[i], len, msg);
1754     else {
1755       CmiReference(msg);
1756       CmiSyncSendAndFree(pes[i], len, msg);
1757     }
1758   }
1759 #else
1760   for(i=0;i<npes;i++) {
1761     CmiSyncSend(pes[i], len, msg);
1762   }
1763 #endif
1764 }
1765
1766 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1767 {
1768   /* A better asynchronous implementation may be wanted, but at least it works */
1769   CmiSyncListSendFn(npes, pes, len, msg);
1770   return (CmiCommHandle) 0;
1771 }
1772
1773 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1774 {
1775   if (npes == 1) {
1776       CmiSyncSendAndFree(pes[0], len, msg);
1777       return;
1778   }
1779 #if CMK_PERSISTENT_COMM
1780   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1781 #if CMK_SMP
1782             && IS_PERSISTENT_MEMORY(msg)
1783 #endif
1784      ){
1785       int i;
1786       for(i=0;i<npes;i++) {
1787         if (pes[i] == CmiMyPe())
1788           CmiSyncSend(pes[i], len, msg);
1789         else {
1790           CmiReference(msg);
1791           CmiSyncSendAndFree(pes[i], len, msg);
1792         }
1793       }
1794       CmiFree(msg);
1795       return;
1796   }
1797 #endif
1798   
1799 #if CMK_BROADCAST_USE_CMIREFERENCE
1800   CmiSyncListSendFn(npes, pes, len, msg);
1801   CmiFree(msg);
1802 #else
1803   int i;
1804   for(i=0;i<npes-1;i++) {
1805     CmiSyncSend(pes[i], len, msg);
1806   }
1807   if (npes>0)
1808     CmiSyncSendAndFree(pes[npes-1], len, msg);
1809   else 
1810     CmiFree(msg);
1811 #endif
1812 }
1813 #endif
1814
1815 static void    PumpDatagramConnection();
1816 static      int         event_SetupConnect = 111;
1817 static      int         event_PumpSmsg = 222 ;
1818 static      int         event_PumpTransaction = 333;
1819 static      int         event_PumpRdmaTransaction = 444;
1820 static      int         event_SendBufferSmsg = 484;
1821 static      int         event_SendFmaRdmaMsg = 555;
1822 static      int         event_AdvanceCommunication = 666;
1823
1824 static void registerUserTraceEvents() {
1825 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1826     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1827     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1828     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1829     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1830     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1831     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1832     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1833 #endif
1834 }
1835
1836 static void ProcessDeadlock()
1837 {
1838     static CmiUInt8 *ptr = NULL;
1839     static CmiUInt8  last = 0, mysum, sum;
1840     static int count = 0;
1841     gni_return_t status;
1842     int i;
1843
1844 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1845 //sweep_mempool(CpvAccess(mempool));
1846     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1847     mysum = smsg_send_count + smsg_recv_count;
1848     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1849     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1850     GNI_RC_CHECK("PMI_Allgather", status);
1851     sum = 0;
1852     for (i=0; i<mysize; i++)  sum+= ptr[i];
1853     if (last == 0 || sum == last) 
1854         count++;
1855     else
1856         count = 0;
1857     last = sum;
1858     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1859     if (count == 2) { 
1860         /* detected twice, it is a real deadlock */
1861         if (myrank == 0)  {
1862             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1863             CmiAbort("Fatal> Deadlock detected.");
1864         }
1865
1866     }
1867     _detected_hang = 0;
1868 }
1869
1870 static void CheckProgress()
1871 {
1872     if (smsg_send_count == last_smsg_send_count &&
1873         smsg_recv_count == last_smsg_recv_count ) 
1874     {
1875         _detected_hang = 1;
1876 #if !CMK_SMP
1877         if (_detected_hang) ProcessDeadlock();
1878 #endif
1879
1880     }
1881     else {
1882         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1883         last_smsg_send_count = smsg_send_count;
1884         last_smsg_recv_count = smsg_recv_count;
1885         _detected_hang = 0;
1886     }
1887 }
1888
1889 static void set_limit()
1890 {
1891     //if (!user_set_flag && CmiMyRank() == 0) {
1892     if (CmiMyRank() == 0) {
1893         int mynode = CmiPhysicalNodeID(CmiMyPe());
1894         int numpes = CmiNumPesOnPhysicalNode(mynode);
1895         int numprocesses = numpes / CmiMyNodeSize();
1896         MAX_REG_MEM  = _totalmem / numprocesses;
1897         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1898         if (CmiMyPe() == 0)
1899            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1900         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1901         {
1902              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1903              CmiAbort("memory registration\n");
1904         }
1905     }
1906 }
1907
1908 void LrtsPostCommonInit(int everReturn)
1909 {
1910 #if CMK_DIRECT
1911     CmiDirectInit();
1912 #endif
1913 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1914     CpvInitialize(double, projTraceStart);
1915     /* only PE 0 needs to care about registration (to generate sts file). */
1916     //if (CmiMyPe() == 0) 
1917     {
1918         registerMachineUserEventsFunction(&registerUserTraceEvents);
1919     }
1920 #endif
1921
1922 #if !CMK_SMP
1923     if (useDynamicSMSG)
1924     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1925 #endif
1926
1927 #if ! LARGEPAGE
1928     if (_checkProgress)
1929 #if CMK_SMP
1930     if (CmiMyRank() == 0)
1931 #endif
1932     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1933 #endif
1934  
1935 #if !LARGEPAGE
1936     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1937 #endif
1938 }
1939
1940 /* this is called by worker thread */
1941 void LrtsPostNonLocal()
1942 {
1943 #if 1
1944
1945 #if CMK_SMP_TRACE_COMMTHREAD
1946     double startT, endT;
1947 #endif
1948
1949 #if MULTI_THREAD_SEND
1950     if(mysize == 1) return;
1951
1952     if (CmiMyRank() % 6 != 3) return;
1953
1954 #if CMK_SMP_TRACE_COMMTHREAD
1955     traceEndIdle();
1956     startT = CmiWallTimer();
1957 #endif
1958
1959     CmiMachineProgressImpl();
1960
1961 #if CMK_SMP_TRACE_COMMTHREAD
1962     endT = CmiWallTimer();
1963     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1964     traceBeginIdle();
1965 #endif
1966
1967 #endif
1968 #endif
1969 }
1970
1971 /* Network progress function is used to poll the network when for
1972    messages. This flushes receive buffers on some  implementations*/
1973 #if CMK_MACHINE_PROGRESS_DEFINED
1974 void CmiMachineProgressImpl() {
1975 #if ! CMK_SMP || MULTI_THREAD_SEND
1976
1977     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1978     SEND_OOB_SMSG(smsg_oob_queue)
1979     PUMP_REMOTE_HIGHPRIORITY
1980     PUMP_LOCAL_HIGHPRIORITY
1981     POST_HIGHPRIORITY_RDMA
1982
1983 #if 0
1984 #if CMK_WORKER_SINGLE_TASK
1985     if (CmiMyRank() % 6 == 0)
1986 #endif
1987     PumpNetworkSmsg();
1988
1989 #if CMK_WORKER_SINGLE_TASK
1990     if (CmiMyRank() % 6 == 1)
1991 #endif
1992     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1993
1994 #if CMK_WORKER_SINGLE_TASK
1995     if (CmiMyRank() % 6 == 2)
1996 #endif
1997     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1998
1999 #if REMOTE_EVENT
2000 #if CMK_WORKER_SINGLE_TASK
2001     if (CmiMyRank() % 6 == 3)
2002 #endif
2003     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2004 #endif
2005
2006 #if CMK_WORKER_SINGLE_TASK
2007     if (CmiMyRank() % 6 == 4)
2008 #endif
2009     {
2010 #if CMK_USE_OOB
2011     SendBufferMsg(&smsg_oob_queue, NULL);
2012     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2013 #else
2014     SendBufferMsg(&smsg_queue, NULL);
2015 #endif
2016     }
2017
2018 #if CMK_WORKER_SINGLE_TASK
2019     if (CmiMyRank() % 6 == 5)
2020 #endif
2021 #if CMK_SMP
2022     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2023 #else
2024     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2025 #endif
2026
2027 #endif
2028 #endif
2029 }
2030 #endif
2031
2032
2033 /* useDynamicSMSG */
2034 static void    PumpDatagramConnection()
2035 {
2036     uint32_t          remote_address;
2037     uint32_t          remote_id;
2038     gni_return_t status;
2039     gni_post_state_t  post_state;
2040     uint64_t          datagram_id;
2041     int i;
2042
2043    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2044    {
2045        if (datagram_id >= mysize) {           /* bound endpoint */
2046            int pe = datagram_id - mysize;
2047            CMI_GNI_LOCK(global_gni_lock)
2048            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2049            CMI_GNI_UNLOCK(global_gni_lock)
2050            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2051            {
2052                CmiAssert(remote_id == pe);
2053                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2054                GNI_RC_CHECK("Dynamic SMSG Init", status);
2055 #if PRINT_SYH
2056                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2057 #endif
2058                CmiAssert(smsg_connected_flag[pe] == 1);
2059                smsg_connected_flag[pe] = 2;
2060            }
2061        }
2062        else {         /* unbound ep */
2063            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2064            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2065            {
2066                CmiAssert(remote_id<mysize);
2067                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2068                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2069                GNI_RC_CHECK("Dynamic SMSG Init", status);
2070 #if PRINT_SYH
2071                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2072 #endif
2073                smsg_connected_flag[remote_id] = 2;
2074
2075                alloc_smsg_attr(&send_smsg_attr);
2076                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2077                GNI_RC_CHECK("post unbound datagram", status);
2078            }
2079        }
2080    }
2081 }
2082
2083 /* pooling CQ to receive network message */
2084 static void PumpNetworkRdmaMsgs()
2085 {
2086     gni_cq_entry_t      event_data;
2087     gni_return_t        status;
2088
2089 }
2090
2091 INLINE_KEYWORD
2092 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2093 {
2094     RDMA_REQUEST        *rdma_request_msg;
2095     MallocRdmaRequest(rdma_request_msg);
2096     rdma_request_msg->destNode = inst_id;
2097     rdma_request_msg->pd = pd;
2098 #if REMOTE_EVENT
2099     rdma_request_msg->ack_index = ack_index;
2100 #endif
2101     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2102 }
2103
2104 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2105 static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2106 static void PRINT_CONTROL(void *header)
2107 {
2108     CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2109
2110     printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld  \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2111 #if PERSISTENT_GET_BASE
2112     printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2113 #endif
2114 }
2115 static void PumpNetworkSmsg()
2116 {
2117     uint64_t            inst_id;
2118     gni_cq_entry_t      event_data;
2119     gni_return_t        status;
2120     void                *header;
2121     uint8_t             msg_tag;
2122     int                 msg_nbytes;
2123     void                *msg_data;
2124     gni_mem_handle_t    msg_mem_hndl;
2125     gni_smsg_attr_t     *smsg_attr;
2126     gni_smsg_attr_t     *remote_smsg_attr;
2127     int                 init_flag;
2128     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2129     uint64_t            source_addr;
2130     SMSG_QUEUE         *queue = &smsg_queue;
2131     PCQueue             tmp_queue;
2132 #if  CMK_DIRECT
2133     cmidirectMsg        *direct_msg;
2134 #endif
2135 #if CMI_PUMPNETWORKSMSG_CAP 
2136     int                  recv_cnt = 0;
2137     while(recv_cnt< PumpNetworkSmsg_cap) {
2138 #else
2139     while(1) {
2140 #endif
2141         CMI_GNI_LOCK(smsg_rx_cq_lock)
2142         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2143         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2144         if(status != GNI_RC_SUCCESS) break;
2145
2146         inst_id = GNI_CQ_GET_INST_ID(event_data);
2147 #if REMOTE_EVENT
2148         inst_id = GET_RANK(inst_id);      /* important */
2149 #endif
2150         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2151 #if PRINT_SYH
2152         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2153 #endif
2154         if (useDynamicSMSG) {
2155             /* subtle: smsg may come before connection is setup */
2156             while (smsg_connected_flag[inst_id] != 2) 
2157                PumpDatagramConnection();
2158         }
2159         msg_tag = GNI_SMSG_ANY_TAG;
2160         while(1) {
2161             CMI_GNI_LOCK(smsg_mailbox_lock)
2162             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2163             if (status != GNI_RC_SUCCESS)
2164             {
2165                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2166                 break;
2167             }
2168 #if         CMI_PUMPNETWORKSMSG_CAP
2169             recv_cnt++; 
2170 #endif
2171 #if PRINT_SYH
2172             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2173 #endif
2174             /* copy msg out and then put into queue (small message) */
2175             switch (msg_tag) {
2176             case SMALL_DATA_TAG:
2177             {
2178                 START_EVENT();
2179                 msg_nbytes = CmiGetMsgSize(header);
2180                 msg_data    = CmiAlloc(msg_nbytes);
2181                 memcpy(msg_data, (char*)header, msg_nbytes);
2182                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2183                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2184                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2185                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2186                 handleOneRecvedMsg(msg_nbytes, msg_data);
2187                 break;
2188             }
2189             case LMSG_PERSISTENT_INIT_TAG:
2190             {   CMI_GNI_UNLOCK(smsg_mailbox_lock)
2191                 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2192                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2193                 break;
2194             }
2195             case LMSG_INIT_TAG:
2196             case LMSG_OOB_INIT_TAG:
2197             {
2198                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2199 #if MULTI_THREAD_SEND
2200                 MallocControlMsg(control_msg_tmp);
2201                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2202                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2203                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2204                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2205                 FreeControlMsg(control_msg_tmp);
2206 #else
2207                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2208                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2209                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2210 #endif
2211                 break;
2212             }
2213 #if !REMOTE_EVENT && !CQWRITE
2214             case ACK_TAG:   //msg fit into mempool
2215             {
2216                 /* Get is done, release message . Now put is not used yet*/
2217                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2218                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2219                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2220 #if ! USE_LRTS_MEMPOOL
2221                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2222 #else
2223                 DecreaseMsgInSend(msg);
2224 #endif
2225                 if(NoMsgInSend(msg))
2226                     buffered_send_msg -= GetMempoolsize(msg);
2227                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2228                 CmiFree(msg);
2229 #if CMI_EXERT_SEND_LARGE_CAP
2230                 SEND_large_pending--;
2231 #endif
2232                 break;
2233             }
2234 #endif
2235             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2236             {
2237 #if MULTI_THREAD_SEND
2238                 MallocControlMsg(header_tmp);
2239                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2240                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2241 #else
2242                 header_tmp = (CONTROL_MSG *) header;
2243 #endif
2244                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2245 #if CMI_EXERT_SEND_LARGE_CAP
2246                 SEND_large_pending--;
2247 #endif
2248                 void *msg = (void*)(header_tmp->source_addr);
2249                 int cur_seq = CmiGetMsgSeq(msg);
2250                 int offset = ONE_SEG*(cur_seq+1);
2251                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2252                 buffered_send_msg -= header_tmp->length;
2253                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2254                 if (remain_size < 0) remain_size = 0;
2255                 CmiSetMsgSize(msg, remain_size);
2256                 if(remain_size <= 0) //transaction done
2257                 {
2258                     CmiFree(msg);
2259                 }else if (header_tmp->total_length > offset)
2260                 {
2261                     CmiSetMsgSeq(msg, cur_seq+1);
2262                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2263                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2264                     //send next seg
2265                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2266                          // pipelining
2267                     if (header_tmp->seq_id == 1) {
2268                       int i;
2269                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2270                         int seq = cur_seq+i+2;
2271                         CmiSetMsgSeq(msg, seq-1);
2272                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2273                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2274                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2275                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2276                       }
2277                     }
2278                 }
2279 #if MULTI_THREAD_SEND
2280                 FreeControlMsg(header_tmp);
2281 #else
2282                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2283 #endif
2284                 break;
2285             }
2286 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2287             case PUT_DONE_TAG:  {   //persistent message
2288                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2289                 int size = ((CONTROL_MSG *) header)->length;
2290                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2291                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2292                 CmiReference(msg);
2293                 CMI_CHECK_CHECKSUM(msg, size);
2294                 handleOneRecvedMsg(size, msg); 
2295 #if PRINT_SYH
2296                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2297 #endif
2298                 break;
2299             }
2300 #endif
2301 #if CMK_DIRECT
2302             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2303                 //create a trigger message
2304                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2305                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2306                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2307                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2308                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2309                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2310                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2311                 break;
2312 #endif
2313             default:
2314                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2315                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2316                 printf("weird tag problem %d \n", msg_tag);
2317                 CmiAbort("Unknown tag\n");
2318             }               // end switch
2319 #if PRINT_SYH
2320             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2321 #endif
2322             smsg_recv_count ++;
2323             msg_tag = GNI_SMSG_ANY_TAG;
2324         } //endwhile GNI_SmsgGetNextWTag
2325     }   //end while GetEvent
2326     if(status == GNI_RC_ERROR_RESOURCE)
2327     {
2328         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2329         GNI_RC_CHECK("Smsg_rx_cq full", status);
2330     }
2331 }
2332
2333 static void printDesc(gni_post_descriptor_t *pd)
2334 {
2335     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2336 }
2337
2338 #if CQWRITE
2339 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2340 {
2341     gni_post_descriptor_t *pd;
2342     gni_return_t        status = GNI_RC_SUCCESS;
2343     
2344     MallocPostDesc(pd);
2345     pd->type = GNI_POST_CQWRITE;
2346     pd->cq_mode = GNI_CQMODE_SILENT;
2347     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2348     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2349     pd->cqwrite_value = data;
2350     pd->remote_mem_hndl = mem_hndl;
2351     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2352     GNI_RC_CHECK("GNI_PostCqWrite", status);
2353 }
2354 #endif
2355
2356 // register memory for a message
2357 // return mem handle
2358 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2359 {
2360     gni_return_t status = GNI_RC_SUCCESS;
2361
2362     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2363
2364 #if CMK_PERSISTENT_COMM_PUT
2365       // persistent message is always registered
2366       // BIG_MSG small pieces do not have malloc chunk header
2367     if (IS_PERSISTENT_MEMORY(msg)) {
2368         *memh = GetMemHndl(msg);
2369         return GNI_RC_SUCCESS;
2370     }
2371 #endif
2372     if(seqno == 0 
2373 #if CMK_PERSISTENT_COMM_PUT
2374          || seqno == PERSIST_SEQ
2375 #endif
2376       )
2377     {
2378         if(IsMemHndlZero((GetMemHndl(msg))))
2379         {
2380             msg = (void*)(msg);
2381             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2382             if(status == GNI_RC_SUCCESS)
2383                 *memh = GetMemHndl(msg);
2384         }
2385         else {
2386             *memh = GetMemHndl(msg);
2387         }
2388     }
2389     else {
2390         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2391         status = registerMemory(msg, size, memh, NULL); 
2392     }
2393     return status;
2394 }
2395
2396 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2397 {
2398 #if   PERSISTENT_GET_BASE
2399     CONTROL_MSG         *request_msg;
2400     gni_return_t        status;
2401     gni_post_descriptor_t *pd;
2402     request_msg = (CONTROL_MSG *) header;
2403
2404     MallocPostDesc(pd);
2405     pd->cqwrite_value = request_msg->seq_id;
2406     pd->first_operand = ALIGN64(request_msg->length); //  total length
2407     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2408         pd->type            = GNI_POST_FMA_GET;
2409     else
2410         pd->type            = GNI_POST_RDMA_GET;
2411     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
2412     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2413     pd->length          = ALIGN64(request_msg->length);
2414     pd->local_addr      = (uint64_t) request_msg->dest_addr;
2415     pd->local_mem_hndl  = request_msg->dest_mem_hndl;
2416     pd->remote_addr     = (uint64_t) request_msg->source_addr;
2417     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2418     pd->src_cq_hndl     = 0;
2419     pd->rdma_mode       = 0;
2420     pd->amo_cmd         = 0;
2421 #if REMOTE_EVENT
2422     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2423 #else
2424     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2425 #endif
2426
2427 #endif
2428 }
2429
2430 // for BIG_MSG called on receiver side for receiving control message
2431 // LMSG_INIT_TAG
2432 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2433 {
2434 #if     USE_LRTS_MEMPOOL
2435     CONTROL_MSG         *request_msg;
2436     gni_return_t        status = GNI_RC_SUCCESS;
2437     void                *msg_data;
2438     gni_post_descriptor_t *pd;
2439     gni_mem_handle_t    msg_mem_hndl;
2440     int                 size, transaction_size, offset = 0;
2441     size_t              register_size = 0;
2442
2443     // initial a get to transfer data from the sender side */
2444     request_msg = (CONTROL_MSG *) header;
2445     size = request_msg->total_length;
2446     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2447     MallocPostDesc(pd);
2448 #if CMK_WITH_STATS 
2449     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2450 #endif
2451     if(request_msg->seq_id < 2)   {
2452         MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2453 #if CMK_SMP_TRACE_COMMTHREAD 
2454         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2455 #endif
2456         msg_data = CmiAlloc(size);
2457         CmiSetMsgSeq(msg_data, 0);
2458         _MEMCHECK(msg_data);
2459     }
2460     else {
2461         offset = ONE_SEG*(request_msg->seq_id-1);
2462         msg_data = (char*)request_msg->dest_addr + offset;
2463     }
2464    
2465     pd->cqwrite_value = request_msg->seq_id;
2466
2467     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2468     SetMemHndlZero(pd->local_mem_hndl);
2469     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2470     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2471         if(NoMsgInRecv( (void*)(msg_data)))
2472             register_size = GetMempoolsize((void*)(msg_data));
2473     }
2474
2475     pd->first_operand = ALIGN64(size);                   //  total length
2476
2477     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2478         pd->type            = GNI_POST_FMA_GET;
2479     else
2480         pd->type            = GNI_POST_RDMA_GET;
2481     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2482     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2483     pd->length          = transaction_size;
2484     pd->local_addr      = (uint64_t) msg_data;
2485     pd->remote_addr     = request_msg->source_addr + offset;
2486     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2487
2488     if (tag == LMSG_OOB_INIT_TAG) 
2489         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2490     else
2491     {
2492 #if MULTI_THREAD_SEND
2493         pd->src_cq_hndl     = rdma_tx_cqh;
2494 #else
2495         pd->src_cq_hndl     = 0;
2496 #endif
2497     }
2498
2499     pd->rdma_mode       = 0;
2500     pd->amo_cmd         = 0;
2501 #if CMI_EXERT_RECV_RDMA_CAP
2502     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2503 #endif
2504     //memory registration success
2505     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2506     {
2507         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2508         CMI_GNI_LOCK(lock)
2509 #if REMOTE_EVENT
2510         if( request_msg->seq_id == 0)
2511         {
2512             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2513             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2514             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2515         }
2516 #endif
2517
2518 #if CMK_WITH_STATS
2519         RDMA_TRY_SEND(pd->type)
2520 #endif
2521         if(pd->type == GNI_POST_RDMA_GET) 
2522         {
2523             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2524         }
2525         else
2526         {
2527             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2528         }
2529         CMI_GNI_UNLOCK(lock)
2530
2531         if(status == GNI_RC_SUCCESS )
2532         {
2533 #if CMI_EXERT_RECV_RDMA_CAP
2534             RDMA_pending++;
2535 #endif
2536             if(pd->cqwrite_value == 0)
2537             {
2538 #if MACHINE_DEBUG_LOG
2539                 buffered_recv_msg += register_size;
2540                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2541 #endif
2542                 IncreaseMsgInRecv(msg_data);
2543 #if CMK_SMP_TRACE_COMMTHREAD 
2544                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2545 #endif
2546             }
2547 #if  CMK_WITH_STATS
2548             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2549             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2550 #endif
2551         }
2552     }else if (status != GNI_RC_SUCCESS)
2553     {
2554         SetMemHndlZero((pd->local_mem_hndl));
2555     }
2556         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2557     {
2558 #if REMOTE_EVENT
2559         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2560 #else
2561         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2562 #endif
2563     }else if (status != GNI_RC_SUCCESS) {
2564         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2565         GNI_RC_CHECK("GetLargeAFter posting", status);
2566     }
2567 #else
2568     CONTROL_MSG         *request_msg;
2569     gni_return_t        status;
2570     void                *msg_data;
2571     gni_post_descriptor_t *pd;
2572     RDMA_REQUEST        *rdma_request_msg;
2573     gni_mem_handle_t    msg_mem_hndl;
2574     //int source;
2575     // initial a get to transfer data from the sender side */
2576     request_msg = (CONTROL_MSG *) header;
2577     msg_data = CmiAlloc(request_msg->length);
2578     _MEMCHECK(msg_data);
2579
2580     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2581
2582     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2583     {
2584         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2585     }
2586
2587     MallocPostDesc(pd);
2588     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2589         pd->type            = GNI_POST_FMA_GET;
2590     else
2591         pd->type            = GNI_POST_RDMA_GET;
2592     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2593     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2594     pd->length          = ALIGN64(request_msg->length);
2595     pd->local_addr      = (uint64_t) msg_data;
2596     pd->remote_addr     = request_msg->source_addr;
2597     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2598     if (tag == LMSG_OOB_INIT_TAG) 
2599         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2600     else
2601     {
2602 #if MULTI_THREAD_SEND
2603         pd->src_cq_hndl     = rdma_tx_cqh;
2604 #else
2605         pd->src_cq_hndl     = 0;
2606 #endif
2607     }
2608     pd->rdma_mode       = 0;
2609     pd->amo_cmd         = 0;
2610
2611     //memory registration successful
2612     if(status == GNI_RC_SUCCESS)
2613     {
2614         pd->local_mem_hndl  = msg_mem_hndl;
2615        
2616         if(pd->type == GNI_POST_RDMA_GET) 
2617         {
2618             CMI_GNI_LOCK(rdma_tx_cq_lock)
2619             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2620             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2621         }
2622         else
2623         {
2624             CMI_GNI_LOCK(default_tx_cq_lock)
2625             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2626             CMI_GNI_UNLOCK(default_tx_cq_lock)
2627         }
2628
2629     }else
2630     {
2631         SetMemHndlZero(pd->local_mem_hndl);
2632     }
2633     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2634     {
2635         MallocRdmaRequest(rdma_request_msg);
2636         rdma_request_msg->next = 0;
2637         rdma_request_msg->destNode = inst_id;
2638         rdma_request_msg->pd = pd;
2639         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2640     }else {
2641         GNI_RC_CHECK("AFter posting", status);
2642     }
2643 #endif
2644 }
2645
2646 #if CQWRITE
2647 static void PumpCqWriteTransactions()
2648 {
2649
2650     gni_cq_entry_t          ev;
2651     gni_return_t            status;
2652     void                    *msg;  
2653     int                     msg_size;
2654     while(1) {
2655         //CMI_GNI_LOCK(my_cq_lock) 
2656         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2657         //CMI_GNI_UNLOCK(my_cq_lock)
2658         if(status != GNI_RC_SUCCESS) break;
2659         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2660 #if CMK_PERSISTENT_COMM_PUT
2661 #if PRINT_SYH
2662         printf(" %d CQ write event %p\n", myrank, msg);
2663 #endif
2664         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2665 #if PRINT_SYH
2666             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2667 #endif
2668             CmiReference(msg);
2669             msg_size = CmiGetMsgSize(msg);
2670             CMI_CHECK_CHECKSUM(msg, msg_size);
2671             handleOneRecvedMsg(msg_size, msg); 
2672             continue;
2673         }
2674 #endif
2675 #if ! USE_LRTS_MEMPOOL
2676        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2677 #else
2678         DecreaseMsgInSend(msg);
2679 #endif
2680         if(NoMsgInSend(msg))
2681             buffered_send_msg -= GetMempoolsize(msg);
2682         CmiFree(msg);
2683     };
2684     if(status == GNI_RC_ERROR_RESOURCE)
2685     {
2686         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2687     }
2688 }
2689 #endif
2690
2691 #if REMOTE_EVENT
2692 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2693 {
2694     gni_cq_entry_t          ev;
2695     gni_return_t            status;
2696     void                    *msg;   
2697     int                     inst_id, index, type, size;
2698
2699 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2700     int                     pump_count = 0;
2701 #endif
2702     while(1) {
2703 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2704         if (pump_count > PumpRemoteTransactions_cap) break;
2705 #endif
2706         CMI_GNI_LOCK(global_gni_lock)
2707 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2708         status = GNI_CqGetEvent(rx_cqh, &ev);
2709 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2710         CMI_GNI_UNLOCK(global_gni_lock)
2711
2712         if(status != GNI_RC_SUCCESS) break;
2713
2714 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2715         pump_count ++;
2716 #endif
2717
2718         inst_id = GNI_CQ_GET_INST_ID(ev);
2719         index = GET_INDEX(inst_id);
2720         type = GET_TYPE(inst_id);
2721         switch (type) {
2722         case 0:    // ACK
2723             CmiAssert(index>=0 && index<ackPool.size);
2724             CMI_GNI_LOCK(ackPool.lock);
2725             //CmiAssert(GetIndexType(ackPool, index) == 1);
2726             msg = GetIndexAddress(ackPool, index);
2727             CMI_GNI_UNLOCK(ackPool.lock);
2728 #if PRINT_SYH
2729             MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2730 #endif
2731 #if ! USE_LRTS_MEMPOOL
2732            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2733 #else
2734             DecreaseMsgInSend(msg);
2735 #endif
2736             if(NoMsgInSend(msg))
2737                 buffered_send_msg -= GetMempoolsize(msg);
2738             CmiFree(msg);
2739             IndexPool_freeslot(&ackPool, index);
2740 #if CMI_EXERT_SEND_LARGE_CAP
2741             SEND_large_pending--;
2742 #endif
2743             break;
2744 #if CMK_PERSISTENT_COMM_PUT
2745         case 1:  {    // PERSISTENT
2746             CmiLock(persistPool.lock);
2747             CmiAssert(GetIndexType(persistPool, index) == 2);
2748             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2749             CmiUnlock(persistPool.lock);
2750             START_EVENT();
2751             msg = slot->destBuf[slot->addrIndex].destAddress;
2752             size = CmiGetMsgSize(msg);
2753             CmiReference(msg);
2754             CMI_CHECK_CHECKSUM(msg, size);
2755             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2756             handleOneRecvedMsg(size, msg); 
2757             break;
2758             }
2759 #endif
2760         default:
2761             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2762             CmiAbort("PumpRemoteTransactions: unknown type");
2763         }
2764     }
2765     if(status == GNI_RC_ERROR_RESOURCE)
2766     {
2767         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2768         GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
2769     }
2770 }
2771 #endif
2772
2773 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2774 {
2775     gni_cq_entry_t          ev;
2776     gni_return_t            status;
2777     uint64_t                type, inst_id;
2778     gni_post_descriptor_t   *tmp_pd;
2779     MSG_LIST                *ptr;
2780     CONTROL_MSG             *ack_msg_tmp;
2781     ACK_MSG                 *ack_msg;
2782     uint8_t                 msg_tag;
2783 #if CMK_DIRECT
2784     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2785 #endif
2786     SMSG_QUEUE         *queue = &smsg_queue;
2787 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2788     int         pump_count = 0;
2789     while(pump_count < PumpLocalTransactions_cap) {
2790         pump_count++;
2791 #else
2792     while(1) {
2793 #endif
2794         CMI_GNI_LOCK(my_cq_lock) 
2795         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2796         CMI_GNI_UNLOCK(my_cq_lock)
2797         if(status != GNI_RC_SUCCESS) break;
2798         
2799         type = GNI_CQ_GET_TYPE(ev);
2800         if (type == GNI_CQ_EVENT_TYPE_POST)
2801         {
2802
2803 #if CMI_EXERT_RECV_RDMA_CAP
2804             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2805             RDMA_pending--;
2806 #endif
2807             inst_id     = GNI_CQ_GET_INST_ID(ev);
2808 #if PRINT_SYH
2809             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2810 #endif
2811             CMI_GNI_LOCK(my_cq_lock)
2812             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2813             CMI_GNI_UNLOCK(my_cq_lock)
2814
2815             switch (tmp_pd->type) {
2816 #if CMK_PERSISTENT_COMM_PUT  || CMK_DIRECT
2817             case GNI_POST_RDMA_PUT:
2818 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
2819                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2820 #endif
2821             case GNI_POST_FMA_PUT:
2822                 if(tmp_pd->amo_cmd == 1) {
2823 #if CMK_DIRECT
2824                     //sender ACK to receiver to trigger it is done
2825                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2826                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2827                     msg_tag = DIRECT_PUT_DONE_TAG;
2828 #endif
2829                 }
2830                 else {
2831                     CmiFree((void *)tmp_pd->local_addr);
2832 #if REMOTE_EVENT
2833                     FreePostDesc(tmp_pd);
2834                     continue;
2835 #elif CQWRITE
2836                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2837                     FreePostDesc(tmp_pd);
2838                     continue;
2839 #else
2840                     MallocControlMsg(ack_msg_tmp);
2841                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2842                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2843                     ack_msg_tmp->length  = tmp_pd->length;
2844                     msg_tag = PUT_DONE_TAG;
2845 #endif
2846                 }
2847                 break;
2848 #endif
2849             case GNI_POST_RDMA_GET:
2850             case GNI_POST_FMA_GET:  {
2851                 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
2852 #if  ! USE_LRTS_MEMPOOL
2853                 MallocControlMsg(ack_msg_tmp);
2854                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2855                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2856                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2857                 msg_tag = ACK_TAG;  
2858 #else
2859 #if CMK_WITH_STATS
2860                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2861 #endif
2862                 int seq_id = tmp_pd->cqwrite_value;
2863                 if(seq_id > 0)      // BIG_MSG
2864                 {
2865                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2866                     MallocControlMsg(ack_msg_tmp);
2867                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2868                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2869                     ack_msg_tmp->seq_id = seq_id;
2870                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2871                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2872                     ack_msg_tmp->length = tmp_pd->length;
2873                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2874                     msg_tag = BIG_MSG_TAG; 
2875                 } 
2876                 else
2877                 {
2878                     if(seq_id < 0)
2879                         CmiReference((void*)tmp_pd->local_addr);
2880                     msg_tag = ACK_TAG; 
2881 #if  !REMOTE_EVENT && !CQWRITE
2882                     MallocAckMsg(ack_msg);
2883                     ack_msg->source_addr = tmp_pd->remote_addr;
2884 #endif
2885                 }
2886 #endif
2887                 break;
2888             }
2889             case  GNI_POST_CQWRITE:
2890                    FreePostDesc(tmp_pd);
2891                    continue;
2892             default:
2893                 CmiPrintf("type=%d\n", tmp_pd->type);
2894                 CmiAbort("PumpLocalTransactions: unknown type!");
2895             }      /* end of switch */
2896
2897 #if CMK_DIRECT
2898             if (tmp_pd->amo_cmd == 1) {
2899                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2900                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2901             }
2902             else
2903 #endif
2904             if (msg_tag == ACK_TAG) {
2905 #if !REMOTE_EVENT
2906 #if   !CQWRITE
2907                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2908                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2909 #else
2910                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2911 #endif
2912 #endif
2913             }
2914             else {
2915                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2916                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2917             }
2918 #if CMK_PERSISTENT_COMM_PUT
2919             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2920 #endif
2921             {
2922                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2923 #if PRINT_SYH
2924                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2925 #endif
2926                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2927                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2928
2929                     START_EVENT();
2930                     //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2931                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2932 #if MACHINE_DEBUG_LOG
2933                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2934                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2935                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2936 #endif
2937                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2938                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2939                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2940                 }else if(msg_tag == BIG_MSG_TAG){
2941                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2942                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2943                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2944                         START_EVENT();
2945 #if PRINT_SYH
2946                         printf("Pipeline msg done [%d]\n", myrank);
2947 #endif
2948 #if     CMK_SMP_TRACE_COMMTHREAD
2949                         if( tmp_pd->cqwrite_value == 1)
2950                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2951 #endif
2952                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2953                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2954                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2955                     }
2956                 }
2957             }
2958             FreePostDesc(tmp_pd);
2959         }
2960     } //end while
2961     if(status == GNI_RC_ERROR_RESOURCE)
2962     {
2963         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2964         GNI_RC_CHECK("Smsg_tx_cq full", status);
2965     }
2966 }
2967
2968 static void  SendRdmaMsg( BufferList sendqueue)
2969 {
2970     gni_return_t            status = GNI_RC_SUCCESS;
2971     gni_mem_handle_t        msg_mem_hndl;
2972     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2973     RDMA_REQUEST            *pre = 0;
2974     uint64_t                register_size = 0;
2975     void                    *msg;
2976     int                     i;
2977
2978     int len = PCQueueLength(sendqueue);
2979     for (i=0; i<len; i++)
2980     {
2981 #if CMI_EXERT_RECV_RDMA_CAP
2982         if( RDMA_pending >= RDMA_cap) break;
2983 #endif
2984         CMI_PCQUEUEPOP_LOCK( sendqueue)
2985         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2986         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2987         if (ptr == NULL) break;
2988         
2989         gni_post_descriptor_t *pd = ptr->pd;
2990         
2991         msg = (void*)(pd->local_addr);
2992         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2993         register_size = 0;
2994         if(pd->cqwrite_value == 0) {
2995             if(NoMsgInRecv(msg))
2996                 register_size = GetMempoolsize(msg);
2997         }
2998
2999         if(status == GNI_RC_SUCCESS)        //mem register good
3000         {
3001             int destNode = ptr->destNode;
3002             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3003             CMI_GNI_LOCK(lock);
3004 #if REMOTE_EVENT
3005             if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3006                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3007                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3008                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3009             }
3010 #if CMK_PERSISTENT_COMM_PUT
3011             else if (pd->cqwrite_value == PERSIST_SEQ) {
3012                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3013                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3014                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3015             }
3016 #endif
3017 #if CMK_DIRECT
3018             else if (pd->cqwrite_value == DIRECT_SEQ) {
3019                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3020                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3021                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3022             }
3023 #endif
3024
3025 #endif
3026 #if CMK_WITH_STATS
3027             RDMA_TRY_SEND(pd->type)
3028 #endif
3029 #if CMK_SMP_TRACE_COMMTHREAD
3030             if(IS_PUT(pd->type))
3031             {
3032                  START_EVENT();
3033                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
3034             }
3035 #endif
3036             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3037             {
3038                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3039             }
3040             else
3041             {
3042                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3043             }
3044             CMI_GNI_UNLOCK(lock);
3045             
3046             if(status == GNI_RC_SUCCESS)    //post good
3047             {
3048                 MACHSTATE4(8, "post noempty-rdma  %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr,  register_memory_size); 
3049 #if CMI_EXERT_RECV_RDMA_CAP
3050                 RDMA_pending ++;
3051 #endif
3052                 if(pd->cqwrite_value <= 0)
3053                 {
3054 #if CMK_SMP_TRACE_COMMTHREAD 
3055                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3056 #endif
3057                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3058                 }
3059 #if  CMK_WITH_STATS
3060                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3061                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3062 #endif
3063 #if MACHINE_DEBUG_LOG
3064                 buffered_recv_msg += register_size;
3065                 MACHSTATE(8, "GO request from buffered\n"); 
3066 #endif
3067 #if PRINT_SYH
3068                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3069 #endif
3070                 FreeRdmaRequest(ptr);
3071             }else           // cannot post
3072             {
3073                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3074 #if PRINT_SYH
3075                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3076 #endif
3077                 break;
3078             }
3079         } else          //memory registration fails
3080         {
3081             PCQueuePush(sendqueue, (char*)ptr);
3082         }
3083     } //end while
3084 }
3085
3086 static 
3087 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3088 {
3089     CONTROL_MSG         *control_msg_tmp;
3090     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3091
3092     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3093     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3094             /* connection not exists yet */
3095 #if CMK_SMP
3096             /* non-smp case, connect is issued in send_smsg_message */
3097         if (smsg_connected_flag[ptr->destNode] == 0)
3098             connect_to(ptr->destNode); 
3099 #endif
3100     }
3101     else
3102     switch(ptr->tag)
3103     {
3104     case SMALL_DATA_TAG:
3105         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3106         if(status == GNI_RC_SUCCESS)
3107         {
3108             CmiFree(ptr->msg);
3109         }
3110         break;
3111     case LMSG_PERSISTENT_INIT_TAG:
3112     case LMSG_INIT_TAG:
3113     case LMSG_OOB_INIT_TAG:
3114         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3115         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3116         break;
3117 #if !REMOTE_EVENT && !CQWRITE
3118     case ACK_TAG:
3119         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3120         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3121         break;
3122 #endif
3123     case BIG_MSG_TAG:
3124         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3125         if(status == GNI_RC_SUCCESS)
3126         {
3127             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3128         }
3129         break;
3130 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE 
3131     case PUT_DONE_TAG:
3132         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3133         if(status == GNI_RC_SUCCESS)
3134         {
3135             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3136         }
3137         break;
3138 #endif
3139 #if CMK_DIRECT
3140     case DIRECT_PUT_DONE_TAG:
3141         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3142         if(status == GNI_RC_SUCCESS)
3143         {
3144             free((CMK_DIRECT_HEADER*)ptr->msg);
3145         }
3146         break;
3147 #endif
3148     default:
3149         printf("Weird tag\n");
3150         CmiAbort("should not happen\n");
3151     }       // end switch
3152     return status;
3153 }
3154
3155 // return 1 if all messages are sent
3156
3157 #if ONE_SEND_QUEUE
3158
3159 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3160 {
3161     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3162     CONTROL_MSG         *control_msg_tmp;
3163     gni_return_t        status;
3164     int                 done = 1;
3165     uint64_t            register_size;
3166     void                *register_addr;
3167     int                 index_previous = -1;
3168 #if     CMI_SENDBUFFERSMSG_CAP
3169     int                 sent_length = 0;
3170 #endif
3171     int          index = 0;
3172     memset(destpe_avail, 0, mysize * sizeof(char));
3173     for (index=0; index<1; index++)
3174     {
3175         int i, len = PCQueueLength(queue->sendMsgBuf);
3176         for (i=0; i<len; i++) 
3177         {
3178             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3179             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3180             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3181             if(ptr == NULL) break;
3182             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3183                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3184                 continue;
3185             }
3186             status = _sendOneBufferedSmsg(queue, ptr);
3187 #if CMI_SENDBUFFERSMSG_CAP
3188             sent_length++;
3189 #endif
3190             if(status == GNI_RC_SUCCESS)
3191             {
3192 #if PRINT_SYH
3193                 buffered_smsg_counter--;
3194                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3195 #endif
3196                 FreeMsgList(ptr);
3197             }else {
3198                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3199                 done = 0;
3200                 if(status == GNI_RC_ERROR_RESOURCE)
3201                 {
3202                     destpe_avail[ptr->destNode] = 1;
3203                 }
3204             } 
3205         } //end while
3206     }   // end pooling for all cores
3207     return done;
3208 }
3209
3210 #else   /*  ! ONE_SEND_QUEUE  */
3211
3212 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3213 {
3214     MSG_LIST            *ptr;
3215     gni_return_t        status;
3216     int                 done = 1;
3217 #if     CMI_SENDBUFFERSMSG_CAP
3218     int                 sent_length = 0;
3219 #endif
3220     int idx;
3221 #if SMP_LOCKS
3222     int          index = -1;
3223     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3224     for(idx =0; idx<nonempty; idx++) 
3225     {
3226         index++;  if (index >= nonempty) index = 0;
3227 #if CMI_SENDBUFFERSMSG_CAP
3228         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3229 #endif
3230         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3231         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3232         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3233         if(current_list == NULL) break; 
3234         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3235             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3236             continue;
3237         }
3238         PCQueue current_queue= current_list->sendSmsgBuf;
3239         CmiLock(current_list->lock);
3240         int i, len = PCQueueLength(current_queue);
3241         current_list->pushed = 0;
3242         CmiUnlock(current_list->lock);
3243 #else      /* ! SMP_LOCKS */
3244     static int          index = -1;
3245     for(idx =0; idx<mysize; idx++) 
3246     {
3247         index++;  if (index == mysize) index = 0;
3248 #if CMI_SENDBUFFERSMSG_CAP
3249         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3250 #endif
3251         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3252         //if (index == myrank) continue;
3253         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3254         int i, len = PCQueueLength(current_queue);
3255 #endif
3256         for (i=0; i<len; i++)  {
3257             CMI_PCQUEUEPOP_LOCK(current_queue)
3258             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3259             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3260             if (ptr == 0) break;
3261
3262             status = _sendOneBufferedSmsg(queue, ptr);
3263 #if CMI_SENDBUFFERSMSG_CAP
3264             sent_length++;
3265 #endif
3266             if(status == GNI_RC_SUCCESS)
3267             {
3268 #if PRINT_SYH
3269                 buffered_smsg_counter--;
3270                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3271 #endif
3272                 FreeMsgList(ptr);
3273             }else {
3274                 PCQueuePush(current_queue, (char*)ptr);
3275                 done = 0;
3276                 if(status == GNI_RC_ERROR_RESOURCE)
3277                 {
3278                     break;
3279                 }
3280             } 
3281         } //end for i
3282 #if SMP_LOCKS
3283         CmiLock(current_list->lock);
3284         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3285         {
3286             current_list->pushed = 1;
3287             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3288         }
3289         CmiUnlock(current_list->lock); 
3290 #endif
3291     }   // end pooling for all cores
3292     return done;
3293 }
3294
3295 #endif
3296
3297 static void ProcessDeadlock();
3298 void LrtsAdvanceCommunication(int whileidle)
3299 {
3300     static int count = 0;
3301     /*  Receive Msg first */
3302 #if CMK_SMP_TRACE_COMMTHREAD
3303     double startT, endT;
3304 #endif
3305     if (useDynamicSMSG && whileidle)
3306     {
3307 #if CMK_SMP_TRACE_COMMTHREAD
3308         startT = CmiWallTimer();
3309 #endif
3310         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3311 #if CMK_SMP_TRACE_COMMTHREAD
3312         endT = CmiWallTimer();
3313         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3314 #endif
3315     }
3316
3317     SEND_OOB_SMSG(smsg_oob_queue)
3318     PUMP_REMOTE_HIGHPRIORITY
3319     PUMP_LOCAL_HIGHPRIORITY
3320     POST_HIGHPRIORITY_RDMA
3321     // Receiving small messages and persistent
3322 #if CMK_SMP_TRACE_COMMTHREAD
3323     startT = CmiWallTimer();
3324 #endif
3325     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3326 #if CMK_SMP_TRACE_COMMTHREAD
3327     endT = CmiWallTimer();
3328     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3329 #endif
3330
3331     SEND_OOB_SMSG(smsg_oob_queue)
3332     PUMP_REMOTE_HIGHPRIORITY
3333     PUMP_LOCAL_HIGHPRIORITY
3334     POST_HIGHPRIORITY_RDMA
3335     
3336     ///* Send buffered Message */
3337 #if CMK_SMP_TRACE_COMMTHREAD
3338     startT = CmiWallTimer();
3339 #endif
3340 #if CMK_USE_OOB
3341     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3342 #else
3343     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3344 #endif
3345 #if CMK_SMP_TRACE_COMMTHREAD
3346     endT = CmiWallTimer();
3347     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3348 #endif
3349
3350     SEND_OOB_SMSG(smsg_oob_queue)
3351     PUMP_REMOTE_HIGHPRIORITY
3352     PUMP_LOCAL_HIGHPRIORITY
3353     POST_HIGHPRIORITY_RDMA
3354
3355     //Pump Get messages or PUT messages
3356 #if CMK_SMP_TRACE_COMMTHREAD
3357     startT = CmiWallTimer();
3358 #endif
3359     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3360 #if MULTI_THREAD_SEND
3361     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3362 #endif
3363 #if CMK_SMP_TRACE_COMMTHREAD
3364     endT = CmiWallTimer();
3365     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3366 #endif
3367     
3368     SEND_OOB_SMSG(smsg_oob_queue)
3369     PUMP_REMOTE_HIGHPRIORITY
3370     PUMP_LOCAL_HIGHPRIORITY
3371     POST_HIGHPRIORITY_RDMA
3372     //Pump Remote event
3373 #if CMK_SMP_TRACE_COMMTHREAD
3374     startT = CmiWallTimer();
3375 #endif
3376 #if CQWRITE
3377     PumpCqWriteTransactions();
3378 #endif
3379 #if REMOTE_EVENT
3380     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3381 #endif
3382 #if CMK_SMP_TRACE_COMMTHREAD
3383     endT = CmiWallTimer();
3384     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3385 #endif
3386
3387     SEND_OOB_SMSG(smsg_oob_queue)
3388     PUMP_REMOTE_HIGHPRIORITY
3389     PUMP_LOCAL_HIGHPRIORITY
3390     POST_HIGHPRIORITY_RDMA
3391
3392 #if CMK_SMP_TRACE_COMMTHREAD
3393     startT = CmiWallTimer();
3394 #endif
3395     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3396 #if CMK_SMP_TRACE_COMMTHREAD
3397     endT = CmiWallTimer();
3398     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3399 #endif
3400
3401 #if CMK_SMP && ! LARGEPAGE
3402     if (_detected_hang)  ProcessDeadlock();
3403 #endif
3404 }
3405
3406 static void set_smsg_max()
3407 {
3408     char *env;
3409
3410     if(mysize <=512)
3411     {
3412         SMSG_MAX_MSG = 1024;
3413     }else if (mysize <= 4096)
3414     {
3415         SMSG_MAX_MSG = 1024;
3416     }else if (mysize <= 16384)
3417     {
3418         SMSG_MAX_MSG = 512;
3419     }else {
3420         SMSG_MAX_MSG = 256;
3421     }
3422
3423     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3424     if (env) SMSG_MAX_MSG = atoi(env);
3425     CmiAssert(SMSG_MAX_MSG > 0);
3426 }    
3427
3428 /* useDynamicSMSG */
3429 static void _init_dynamic_smsg()
3430 {
3431     gni_return_t status;
3432     uint32_t     vmdh_index = -1;
3433     int i;
3434
3435     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3436     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3437     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3438     for(i=0; i<mysize; i++) {
3439         smsg_connected_flag[i] = 0;
3440         smsg_attr_vector_local[i] = NULL;
3441         smsg_attr_vector_remote[i] = NULL;
3442     }
3443
3444     set_smsg_max();
3445
3446     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3447     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3448     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3449     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3450     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3451
3452     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3453     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3454     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3455     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3456     mailbox_list->offset = 0;
3457     mailbox_list->next = 0;
3458     
3459     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3460         mailbox_list->size, smsg_rx_cqh,
3461         GNI_MEM_READWRITE,   
3462         vmdh_index,
3463         &(mailbox_list->mem_hndl));
3464     GNI_RC_CHECK("MEMORY registration for smsg", status);
3465
3466     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3467     GNI_RC_CHECK("Unbound EP", status);
3468     
3469     alloc_smsg_attr(&send_smsg_attr);
3470
3471     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3472     GNI_RC_CHECK("post unbound datagram", status);
3473
3474       /* always pre-connect to proc 0 */
3475     //if (myrank != 0) connect_to(0);
3476
3477     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3478     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3479 }
3480
3481 static void _init_static_smsg()
3482 {
3483     gni_smsg_attr_t      *smsg_attr;
3484     gni_smsg_attr_t      remote_smsg_attr;
3485     gni_smsg_attr_t      *smsg_attr_vec;
3486     gni_mem_handle_t     my_smsg_mdh_mailbox;
3487     int      ret, i;
3488     gni_return_t status;
3489     uint32_t              vmdh_index = -1;
3490     mdh_addr_t            base_infor;
3491     mdh_addr_t            *base_addr_vec;
3492
3493     set_smsg_max();
3494     
3495     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3496     
3497     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3498     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3499     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3500     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3501     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3502     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3503     CmiAssert(ret == 0);
3504     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3505     
3506     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3507             smsg_memlen*(mysize), smsg_rx_cqh,
3508             GNI_MEM_READWRITE,   
3509             vmdh_index,
3510             &my_smsg_mdh_mailbox);
3511     register_memory_size += smsg_memlen*(mysize);
3512     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3513
3514     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3515     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3516
3517     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3518     base_infor.mdh =  my_smsg_mdh_mailbox;
3519     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3520
3521     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3522  
3523     for(i=0; i<mysize; i++)
3524     {
3525         if(i==myrank)
3526             continue;
3527         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3528         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3529         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3530         smsg_attr[i].mbox_offset = i*smsg_memlen;
3531         smsg_attr[i].buff_size = smsg_memlen;
3532         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3533         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3534     }
3535
3536     for(i=0; i<mysize; i++)
3537     {
3538         if (myrank == i) continue;