f41923dba4c26d180a00e8ed0864fe0f8b328f57
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
43
44 #include "converse.h"
45
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
49
50 #define     LARGEPAGE              1
51
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND          0
54 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
55 #endif
56
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK     1
59 #endif
60
61 #define REMOTE_EVENT               1
62 #define CQWRITE                    0
63
64 #define CMI_EXERT_SEND_LARGE_CAP   0
65 #define CMI_EXERT_RECV_RDMA_CAP    0
66
67
68 #define CMI_SENDBUFFERSMSG_CAP            0
69 #define CMI_PUMPNETWORKSMSG_CAP           0
70 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
71 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
72
73 #if CMI_SENDBUFFERSMSG_CAP
74 int     SendBufferMsg_cap  = 20;
75 #endif
76
77 #if CMI_PUMPNETWORKSMSG_CAP
78 int     PumpNetworkSmsg_cap = 20;
79 #endif
80
81 #if CMI_PUMPREMOTETRANSACTIONS_CAP
82 int     PumpRemoteTransactions_cap = 20;
83 #endif
84
85 #if CMI_PUMPREMOTETRANSACTIONS_CAP
86 int     PumpLocalTransactions_cap = 15;
87 #endif
88
89 #if CMI_EXERT_SEND_LARGE_CAP
90 static int SEND_large_cap = 20;
91 static int SEND_large_pending = 0;
92 #endif
93
94 #if CMI_EXERT_RECV_RDMA_CAP
95 static int   RDMA_cap =   10;
96 static int   RDMA_pending = 0;
97 #endif
98
99 #define USE_LRTS_MEMPOOL                  1
100
101 #define PRINT_SYH                         0
102
103 // Trace communication thread
104 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
105 #define TRACE_THRESHOLD     0.00001
106 #define CMI_MPI_TRACE_MOREDETAILED 0
107 #undef CMI_MPI_TRACE_USEREVENTS
108 #define CMI_MPI_TRACE_USEREVENTS 1
109 #else
110 #undef CMK_SMP_TRACE_COMMTHREAD
111 #define CMK_SMP_TRACE_COMMTHREAD 0
112 #endif
113
114 #define CMK_TRACE_COMMOVERHEAD 0
115 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
116 #undef CMI_MPI_TRACE_USEREVENTS
117 #define CMI_MPI_TRACE_USEREVENTS 1
118 #else
119 #undef CMK_TRACE_COMMOVERHEAD
120 #define CMK_TRACE_COMMOVERHEAD 0
121 #endif
122
123 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
124 CpvStaticDeclare(double, projTraceStart);
125 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
126 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
127 #define  EVENT_TIME()   CpvAccess(projTraceStart)
128 #else
129 #define  START_EVENT()
130 #define  END_EVENT(x)
131 #define  EVENT_TIME()   (0.0)
132 #endif
133
134 #if USE_LRTS_MEMPOOL
135
136 #define oneMB (1024ll*1024)
137 #define oneGB (1024ll*1024*1024)
138
139 static CmiInt8 _mempool_size = 8*oneMB;
140 static CmiInt8 _expand_mem =  4*oneMB;
141 static CmiInt8 _mempool_size_limit = 0;
142
143 static CmiInt8 _totalmem = 0.8*oneGB;
144
145 #if LARGEPAGE
146 static CmiInt8 BIG_MSG  =  16*oneMB;
147 static CmiInt8 ONE_SEG  =  4*oneMB;
148 #else
149 static CmiInt8 BIG_MSG  =  4*oneMB;
150 static CmiInt8 ONE_SEG  =  2*oneMB;
151 #endif
152 #if MULTI_THREAD_SEND
153 static int BIG_MSG_PIPELINE = 1;
154 #else
155 static int BIG_MSG_PIPELINE = 4;
156 #endif
157
158 // dynamic flow control
159 static CmiInt8 buffered_send_msg = 0;
160 static CmiInt8 register_memory_size = 0;
161
162 #if LARGEPAGE
163 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
164 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
165 static CmiInt8  register_count = 0;
166 #else
167 #if CMK_SMP && COMM_THREAD_SEND 
168 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
169 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
170 #else
171 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
172 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
173 #endif
174
175
176 #endif
177
178 #endif     /* end USE_LRTS_MEMPOOL */
179
180 #if MULTI_THREAD_SEND 
181 #define     CMI_GNI_LOCK(x)       CmiLock(x);
182 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
183 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
184 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
185 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
186 #else
187 #define     CMI_GNI_LOCK(x)
188 #define     CMI_GNI_TRYLOCK(x)         (0)
189 #define     CMI_GNI_UNLOCK(x)
190 #define     CMI_PCQUEUEPOP_LOCK(Q)   
191 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
192 #endif
193
194 static int _tlbpagesize = 4096;
195
196 //static int _smpd_count  = 0;
197
198 static int   user_set_flag  = 0;
199
200 static int _checkProgress = 1;             /* check deadlock */
201 static int _detected_hang = 0;
202
203 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
204
205 // dynamic SMSG
206 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
207
208 static int avg_smsg_connection = 32;
209 static int                 *smsg_connected_flag= 0;
210 static gni_smsg_attr_t     **smsg_attr_vector_local;
211 static gni_smsg_attr_t     **smsg_attr_vector_remote;
212 static gni_ep_handle_t     ep_hndl_unbound;
213 static gni_smsg_attr_t     send_smsg_attr;
214 static gni_smsg_attr_t     recv_smsg_attr;
215
216 typedef struct _dynamic_smsg_mailbox{
217    void     *mailbox_base;
218    int      size;
219    int      offset;
220    gni_mem_handle_t  mem_hndl;
221    struct      _dynamic_smsg_mailbox  *next;
222 }dynamic_smsg_mailbox_t;
223
224 static dynamic_smsg_mailbox_t  *mailbox_list;
225
226 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
227 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
228
229 #if PRINT_SYH
230 int         lrts_send_msg_id = 0;
231 int         lrts_local_done_msg = 0;
232 int         lrts_send_rdma_success = 0;
233 #endif
234
235 #include "machine.h"
236
237 #include "pcqueue.h"
238
239 #include "mempool.h"
240
241 #if CMK_PERSISTENT_COMM
242 #include "machine-persistent.h"
243 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
244 #else  
245 #define  POST_HIGHPRIORITY_RDMA   
246 #endif
247
248 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM) 
249 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
250 #else
251 #define  PUMP_REMOTE_HIGHPRIORITY
252 #endif
253
254 //#define  USE_ONESIDED 1
255 #ifdef USE_ONESIDED
256 //onesided implementation is wrong, since no place to restore omdh
257 #include "onesided.h"
258 onesided_hnd_t   onesided_hnd;
259 onesided_md_t    omdh;
260 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
261
262 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
263
264 #else
265 uint8_t   onesided_hnd, omdh;
266
267 #if REMOTE_EVENT || CQWRITE 
268 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
269     if(register_memory_size+size>= MAX_REG_MEM) { \
270         status = GNI_RC_ERROR_NOMEM;} \
271     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
272         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
273 #else
274 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
275         if (register_memory_size + size >= MAX_REG_MEM) { \
276             status = GNI_RC_ERROR_NOMEM; \
277         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
278             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
279 #endif
280
281 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
282     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
283              register_memory_size -= size; \
284          else CmiAbort("MEM_DEregister");  \
285     } while (0)
286 #endif
287
288 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
289 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
290 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
291 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
292 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
293 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
294 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
295 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
296 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
297 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
298 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
299 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
300 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
301 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
302
303 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
304 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
305
306 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
307 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
308 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
309 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
310
311 #define ALIGNBUF                64
312
313 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
314 /* If SMSG is not used */
315
316 #define FMA_PER_CORE  1024
317 #define FMA_BUFFER_SIZE 1024
318
319 /* If SMSG is used */
320 static int  SMSG_MAX_MSG = 1024;
321 #define SMSG_MAX_CREDIT    72
322
323 #define MSGQ_MAXSIZE       2048
324
325 /* large message transfer with FMA or BTE */
326 #if ! REMOTE_EVENT
327 #define LRTS_GNI_RDMA_THRESHOLD  1024 
328 #else
329    /* remote events only work with RDMA */
330 #define LRTS_GNI_RDMA_THRESHOLD  0 
331 #endif
332
333 #if CMK_SMP
334 static int  REMOTE_QUEUE_ENTRIES=163840; 
335 static int LOCAL_QUEUE_ENTRIES=163840; 
336 #else
337 static int  REMOTE_QUEUE_ENTRIES=20480;
338 static int LOCAL_QUEUE_ENTRIES=20480; 
339 #endif
340
341 #define BIG_MSG_TAG             0x26
342 #define PUT_DONE_TAG            0x28
343 #define DIRECT_PUT_DONE_TAG     0x29
344 #define ACK_TAG                 0x30
345 /* SMSG is data message */
346 #define SMALL_DATA_TAG          0x31
347 /* SMSG is a control message to initialize a BTE */
348 #define LMSG_INIT_TAG           0x33 
349 #define LMSG_OOB_INIT_TAG       0x35
350
351 #define DEBUG
352 #ifdef GNI_RC_CHECK
353 #undef GNI_RC_CHECK
354 #endif
355 #ifdef DEBUG
356 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
357 #else
358 #define GNI_RC_CHECK(msg,rc)
359 #endif
360
361 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
362 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
363 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
364
365 static int useStaticMSGQ = 0;
366 static int useStaticFMA = 0;
367 static int mysize, myrank;
368 static gni_nic_handle_t   nic_hndl;
369
370 typedef struct {
371     gni_mem_handle_t mdh;
372     uint64_t addr;
373 } mdh_addr_t ;
374 // this is related to dynamic SMSG
375
376 typedef struct mdh_addr_list{
377     gni_mem_handle_t mdh;
378    void *addr;
379     struct mdh_addr_list *next;
380 }mdh_addr_list_t;
381
382 static unsigned int         smsg_memlen;
383 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
384 mdh_addr_t          setup_mem;
385 mdh_addr_t          *smsg_connection_vec = 0;
386 gni_mem_handle_t    smsg_connection_memhndl;
387 static int          smsg_expand_slots = 10;
388 static int          smsg_available_slot = 0;
389 static void         *smsg_mailbox_mempool = 0;
390 mdh_addr_list_t     *smsg_dynamic_list = 0;
391
392 static void             *smsg_mailbox_base;
393 gni_msgq_attr_t         msgq_attrs;
394 gni_msgq_handle_t       msgq_handle;
395 gni_msgq_ep_attr_t      msgq_ep_attrs;
396 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
397
398 /* =====Beginning of Declarations of Machine Specific Variables===== */
399 static int cookie;
400 static int modes = 0;
401 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
402 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
403 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
404 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
405 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
406 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
407 static gni_ep_handle_t       *ep_hndl_array;
408
409 static CmiNodeLock           *ep_lock_array;
410 static CmiNodeLock           default_tx_cq_lock; 
411 static CmiNodeLock           rdma_tx_cq_lock; 
412 static CmiNodeLock           global_gni_lock; 
413 static CmiNodeLock           rx_cq_lock;
414 static CmiNodeLock           smsg_mailbox_lock;
415 static CmiNodeLock           smsg_rx_cq_lock;
416 static CmiNodeLock           *mempool_lock;
417 //#define     CMK_WITH_STATS      1
418 typedef struct msg_list
419 {
420     uint32_t destNode;
421     uint32_t size;
422     void *msg;
423     uint8_t tag;
424 #if CMK_WITH_STATS
425     double  creation_time;
426 #endif
427 }MSG_LIST;
428
429
430 typedef struct control_msg
431 {
432     uint64_t            source_addr;    /* address from the start of buffer  */
433     uint64_t            dest_addr;      /* address from the start of buffer */
434     int                 total_length;   /* total length */
435     int                 length;         /* length of this packet */
436 #if REMOTE_EVENT
437     int                 ack_index;      /* index from integer to address */
438 #endif
439     uint8_t             seq_id;         //big message   0 meaning single message
440     gni_mem_handle_t    source_mem_hndl;
441     struct control_msg *next;
442 } CONTROL_MSG;
443
444 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
445
446 typedef struct ack_msg
447 {
448     uint64_t            source_addr;    /* address from the start of buffer  */
449 #if ! USE_LRTS_MEMPOOL
450     gni_mem_handle_t    source_mem_hndl;
451     int                 length;          /* total length */
452 #endif
453     struct ack_msg     *next;
454 } ACK_MSG;
455
456 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
457
458 #if CMK_DIRECT
459 typedef struct{
460     uint64_t    handler_addr;
461 }CMK_DIRECT_HEADER;
462
463 typedef struct {
464     char core[CmiMsgHeaderSizeBytes];
465     uint64_t handler;
466 }cmidirectMsg;
467
468 //SYH
469 CpvDeclare(int, CmiHandleDirectIdx);
470 void CmiHandleDirectMsg(cmidirectMsg* msg)
471 {
472
473     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
474    (*(_handle->callbackFnPtr))(_handle->callbackData);
475    CmiFree(msg);
476 }
477
478 void CmiDirectInit()
479 {
480     CpvInitialize(int,  CmiHandleDirectIdx);
481     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
482 }
483
484 #endif
485 typedef struct  rmda_msg
486 {
487     int                   destNode;
488 #if REMOTE_EVENT
489     int                   ack_index;
490 #endif
491     gni_post_descriptor_t *pd;
492 }RDMA_REQUEST;
493
494
495 #define SMP_LOCKS                       1
496 #define ONE_SEND_QUEUE                  0
497 typedef PCQueue BufferList;
498 typedef struct  msg_list_index
499 {
500     PCQueue       sendSmsgBuf;
501 #if  SMP_LOCKS
502     CmiNodeLock   lock;
503     int           pushed;
504     int           destpe;
505 #endif
506 } MSG_LIST_INDEX;
507 char                *destpe_avail;
508 PCQueue sendRdmaBuf;
509 PCQueue sendHighPriorBuf;
510 // buffered send queue
511 #if ! ONE_SEND_QUEUE
512 typedef struct smsg_queue
513 {
514     MSG_LIST_INDEX   *smsg_msglist_index;
515     int               smsg_head_index;
516 #if  SMP_LOCKS
517     PCQueue     nonEmptyQueues;
518 #endif
519 } SMSG_QUEUE;
520 #else
521 typedef struct smsg_queue
522 {
523     PCQueue       sendMsgBuf;
524 }  SMSG_QUEUE;
525 #endif
526
527 SMSG_QUEUE                  smsg_queue;
528 #if CMK_USE_OOB
529 SMSG_QUEUE                  smsg_oob_queue;
530 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
531 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
532 #else
533 #define SEND_OOB_SMSG(x)            
534 #define PUMP_LOCAL_HIGHPRIORITY     
535 #endif
536
537 #define FreeMsgList(d)   free(d);
538 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
539
540 #define FreeControlMsg(d)      free(d);
541 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
542
543 #define FreeAckMsg(d)      free(d);
544 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
545
546 #define FreeRdmaRequest(d)       free(d);
547 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
548 /* reuse gni_post_descriptor_t */
549 static gni_post_descriptor_t *post_freelist=0;
550
551 #define FreePostDesc(d)     free(d);
552 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
553
554
555 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
556 static int      buffered_smsg_counter = 0;
557
558 /* SmsgSend return success but message sent is not confirmed by remote side */
559 static MSG_LIST *buffered_fma_head = 0;
560 static MSG_LIST *buffered_fma_tail = 0;
561
562 /* functions  */
563 #define IsFree(a,ind)  !( a& (1<<(ind) ))
564 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
565 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
566
567 CpvDeclare(mempool_type*, mempool);
568
569 #if CMK_PERSISTENT_COMM
570 CpvDeclare(mempool_type*, persistent_mempool);
571 #endif
572
573 #if REMOTE_EVENT
574 /* ack pool for remote events */
575
576 static int  SHIFT   =           18;
577 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
578 #define RANK_MASK               ((1<<SHIFT) - 1)
579 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
580
581 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
582 #define GET_RANK(evt)           ((evt) & RANK_MASK)
583 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
584
585 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
586
587 #if CMK_SMP
588 #define INIT_SIZE                4096
589 #else
590 #define INIT_SIZE                1024
591 #endif
592
593 struct IndexStruct {
594 void *addr;
595 int next;
596 int type;     // 1: ACK   2: Persistent
597 };
598
599 typedef struct IndexPool {
600     struct IndexStruct   *indexes;
601     int                   size;
602     int                   freehead;
603     CmiNodeLock           lock;
604 } IndexPool;
605
606 static IndexPool  ackPool;
607 #if CMK_PERSISTENT_COMM
608 static IndexPool  persistPool;
609 #endif
610
611 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
612 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
613
614 static void IndexPool_init(IndexPool *pool)
615 {
616     int i;
617     if ((1<<SHIFT) < mysize) 
618         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
619     pool->size = INIT_SIZE;
620     if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
621     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
622     for (i=0; i<pool->size-1; i++) {
623         pool->indexes[i].next = i+1;
624         pool->indexes[i].type = 0;
625     }
626     pool->indexes[i].next = -1;
627     pool->freehead = 0;
628 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
629     pool->lock  = CmiCreateLock();
630 #else
631     pool->lock  = 0;
632 #endif
633 }
634
635 static
636 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
637 {
638     int s, i;
639 #if MULTI_THREAD_SEND  
640     CmiLock(pool->lock);
641 #endif
642     s = pool->freehead;
643     if (s == -1) {
644         int newsize = pool->size * 2;
645         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
646         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
647         struct IndexStruct *old_ackpool = pool->indexes;
648         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
649         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
650         for (i=pool->size; i<newsize-1; i++) {
651             pool->indexes[i].next = i+1;
652             pool->indexes[i].type = 0;
653         }
654         pool->indexes[i].next = -1;
655         pool->indexes[i].type = 0;
656         pool->freehead = pool->size;
657         s = pool->size;
658         pool->size = newsize;
659         free(old_ackpool);
660     }
661     pool->freehead = pool->indexes[s].next;
662     pool->indexes[s].addr = addr;
663     CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
664     pool->indexes[s].type = type;
665 #if MULTI_THREAD_SEND
666     CmiUnlock(pool->lock);
667 #endif
668     return s;
669 }
670
671 static
672 inline  void IndexPool_freeslot(IndexPool *pool, int s)
673 {
674     CmiAssert(s>=0 && s<pool->size);
675 #if MULTI_THREAD_SEND
676     CmiLock(pool->lock);
677 #endif
678     pool->indexes[s].next = pool->freehead;
679     pool->indexes[s].type = 0;
680     pool->freehead = s;
681 #if MULTI_THREAD_SEND
682     CmiUnlock(pool->lock);
683 #endif
684 }
685
686
687 #endif
688
689 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
690 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
691 #define CHARM_MAGIC_NUMBER               126
692
693 #if CMK_ERROR_CHECKING
694 extern unsigned char computeCheckSum(unsigned char *data, int len);
695 static int checksum_flag = 0;
696 #define CMI_SET_CHECKSUM(msg, len)      \
697         if (checksum_flag)  {   \
698           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
699           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
700         }
701 #define CMI_CHECK_CHECKSUM(msg, len)    \
702         if (checksum_flag)      \
703           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
704             CmiAbort("Fatal error: checksum doesn't agree!\n");
705 #else
706 #define CMI_SET_CHECKSUM(msg, len)
707 #define CMI_CHECK_CHECKSUM(msg, len)
708 #endif
709 /* =====End of Definitions of Message-Corruption Related Macros=====*/
710
711 static int print_stats = 0;
712 static int stats_off = 0;
713 void CmiTurnOnStats()
714 {
715     stats_off = 0;
716     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
717 }
718
719 void CmiTurnOffStats()
720 {
721     stats_off = 1;
722 }
723
724 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
725
726 #if CMK_WITH_STATS
727 FILE *counterLog = NULL;
728 typedef struct comm_thread_stats
729 {
730     uint64_t  smsg_data_count;
731     uint64_t  lmsg_init_count;
732     uint64_t  ack_count;
733     uint64_t  big_msg_ack_count;
734     uint64_t  smsg_count;
735     uint64_t  direct_put_done_count;
736     uint64_t  put_done_count;
737     //times of calling SmsgSend
738     uint64_t  try_smsg_data_count;
739     uint64_t  try_lmsg_init_count;
740     uint64_t  try_ack_count;
741     uint64_t  try_big_msg_ack_count;
742     uint64_t  try_direct_put_done_count;
743     uint64_t  try_put_done_count;
744     uint64_t  try_smsg_count;
745     
746     double    max_time_in_send_buffered_smsg;
747     double    all_time_in_send_buffered_smsg;
748
749     uint64_t  rdma_get_count, rdma_put_count;
750     uint64_t  try_rdma_get_count, try_rdma_put_count;
751     double    max_time_from_control_to_rdma_init;
752     double    all_time_from_control_to_rdma_init;
753
754     double    max_time_from_rdma_init_to_rdma_done;
755     double    all_time_from_rdma_init_to_rdma_done;
756
757     int      count_in_PumpNetwork;
758     double   time_in_PumpNetwork;
759     double   max_time_in_PumpNetwork;
760     int      count_in_SendBufferMsg_smsg;
761     double   time_in_SendBufferMsg_smsg;
762     double   max_time_in_SendBufferMsg_smsg;
763     int      count_in_SendRdmaMsg;
764     double   time_in_SendRdmaMsg;
765     double   max_time_in_SendRdmaMsg;
766     int      count_in_PumpRemoteTransactions;
767     double   time_in_PumpRemoteTransactions;
768     double   max_time_in_PumpRemoteTransactions;
769     int      count_in_PumpLocalTransactions_rdma;
770     double   time_in_PumpLocalTransactions_rdma;
771     double   max_time_in_PumpLocalTransactions_rdma;
772     int      count_in_PumpDatagramConnection;
773     double   time_in_PumpDatagramConnection;
774     double   max_time_in_PumpDatagramConnection;
775 } Comm_Thread_Stats;
776
777 static Comm_Thread_Stats   comm_stats;
778
779 static char *counters_dirname = "counters";
780
781 static void init_comm_stats()
782 {
783   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
784   if (print_stats){
785       char ln[200];
786       int code = mkdir(counters_dirname, 00777); 
787       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
788       counterLog=fopen(ln,"w");
789       if (counterLog == NULL) CmiAbort("Counter files open failed");
790   }
791 }
792
793 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
794
795 #define SMSG_SENT_DONE(creation_time, tag)  \
796         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
797             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
798             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
799             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
800             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
801             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
802             comm_stats.smsg_count++; \
803             double inbuff_time = CmiWallTimer() - creation_time;   \
804             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
805             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
806         }
807
808 #define SMSG_TRY_SEND(tag)  \
809         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
810             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
811             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
812             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
813             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
814             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
815             comm_stats.try_smsg_count++; \
816         }
817
818 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
819
820 #define  RDMA_TRANS_DONE(x)      \
821          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
822              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
823              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
824          }
825
826 #define  RDMA_TRANS_INIT(type, x)      \
827          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
828              double rdma_trans_time = CmiWallTimer() - x ; \
829              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
830              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
831          }
832
833 #define STATS_PUMPNETWORK_TIME(x)   \
834         { double t = CmiWallTimer(); \
835           x;        \
836           t = CmiWallTimer() - t;          \
837           comm_stats.count_in_PumpNetwork++;        \
838           comm_stats.time_in_PumpNetwork += t;   \
839           if (t>comm_stats.max_time_in_PumpNetwork)      \
840               comm_stats.max_time_in_PumpNetwork = t;    \
841         }
842
843 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
844         { double t = CmiWallTimer(); \
845           x;        \
846           t = CmiWallTimer() - t;          \
847           comm_stats.count_in_PumpRemoteTransactions ++;        \
848           comm_stats.time_in_PumpRemoteTransactions += t;   \
849           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
850               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
851         }
852
853 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
854         { double t = CmiWallTimer(); \
855           x;        \
856           t = CmiWallTimer() - t;          \
857           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
858           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
859           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
860               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
861         }
862
863 #define STATS_SEND_SMSGS_TIME(x)   \
864         { double t = CmiWallTimer(); \
865           x;        \
866           t = CmiWallTimer() - t;          \
867           comm_stats.count_in_SendBufferMsg_smsg ++;        \
868           comm_stats.time_in_SendBufferMsg_smsg += t;   \
869           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
870               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
871         }
872
873 #define STATS_SENDRDMAMSG_TIME(x)   \
874         { double t = CmiWallTimer(); \
875           x;        \
876           t = CmiWallTimer() - t;          \
877           comm_stats.count_in_SendRdmaMsg ++;        \
878           comm_stats.time_in_SendRdmaMsg += t;   \
879           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
880               comm_stats.max_time_in_SendRdmaMsg = t;    \
881         }
882
883 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
884         { double t = CmiWallTimer(); \
885           x;        \
886           t = CmiWallTimer() - t;          \
887           comm_stats.count_in_PumpDatagramConnection ++;        \
888           comm_stats.time_in_PumpDatagramConnection += t;   \
889           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
890               comm_stats.max_time_in_PumpDatagramConnection = t;    \
891         }
892
893 static void print_comm_stats()
894 {
895     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
896     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
897             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
898             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
899     
900     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
901             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
902             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
903
904     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
905     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
906     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
907
908
909     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
910     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
911     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
912     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
913     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
914     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
915     if (useDynamicSMSG)
916     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
917
918     fclose(counterLog);
919 }
920
921 #else
922 #define STATS_PUMPNETWORK_TIME(x)                  x
923 #define STATS_SEND_SMSGS_TIME(x)                   x
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
926 #define STATS_SENDRDMAMSG_TIME(x)                  x
927 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
928 #endif
929
930 static void
931 allgather(void *in,void *out, int len)
932 {
933     static int *ivec_ptr=NULL,already_called=0,job_size=0;
934     int i,rc;
935     int my_rank;
936     char *tmp_buf,*out_ptr;
937
938     if(!already_called) {
939
940         rc = PMI_Get_size(&job_size);
941         CmiAssert(rc == PMI_SUCCESS);
942         rc = PMI_Get_rank(&my_rank);
943         CmiAssert(rc == PMI_SUCCESS);
944
945         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
946         CmiAssert(ivec_ptr != NULL);
947
948         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
949         CmiAssert(rc == PMI_SUCCESS);
950
951         already_called = 1;
952
953     }
954
955     tmp_buf = (char *)malloc(job_size * len);
956     CmiAssert(tmp_buf);
957
958     rc = PMI_Allgather(in,tmp_buf,len);
959     CmiAssert(rc == PMI_SUCCESS);
960
961     out_ptr = out;
962
963     for(i=0;i<job_size;i++) {
964
965         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
966
967     }
968
969     free(tmp_buf);
970 }
971
972 static void
973 allgather_2(void *in,void *out, int len)
974 {
975     //PMI_Allgather is out of order
976     int i,rc, extend_len;
977     int  rank_index;
978     char *out_ptr, *out_ref;
979     char *in2;
980
981     extend_len = sizeof(int) + len;
982     in2 = (char*)malloc(extend_len);
983
984     memcpy(in2, &myrank, sizeof(int));
985     memcpy(in2+sizeof(int), in, len);
986
987     out_ptr = (char*)malloc(mysize*extend_len);
988
989     rc = PMI_Allgather(in2, out_ptr, extend_len);
990     GNI_RC_CHECK("allgather", rc);
991
992     out_ref = out;
993
994     for(i=0;i<mysize;i++) {
995         //rank index 
996         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
997         //copy to the rank index slot
998         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
999     }
1000
1001     free(out_ptr);
1002     free(in2);
1003
1004 }
1005
1006 static unsigned int get_gni_nic_address(int device_id)
1007 {
1008     unsigned int address, cpu_id;
1009     gni_return_t status;
1010     int i, alps_dev_id=-1,alps_address=-1;
1011     char *token, *p_ptr;
1012
1013     p_ptr = getenv("PMI_GNI_DEV_ID");
1014     if (!p_ptr) {
1015         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1016        
1017         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1018     } else {
1019         while ((token = strtok(p_ptr,":")) != NULL) {
1020             alps_dev_id = atoi(token);
1021             if (alps_dev_id == device_id) {
1022                 break;
1023             }
1024             p_ptr = NULL;
1025         }
1026         CmiAssert(alps_dev_id != -1);
1027         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1028         CmiAssert(p_ptr != NULL);
1029         i = 0;
1030         while ((token = strtok(p_ptr,":")) != NULL) {
1031             if (i == alps_dev_id) {
1032                 alps_address = atoi(token);
1033                 break;
1034             }
1035             p_ptr = NULL;
1036             ++i;
1037         }
1038         CmiAssert(alps_address != -1);
1039         address = alps_address;
1040     }
1041     return address;
1042 }
1043
1044 static uint8_t get_ptag(void)
1045 {
1046     char *p_ptr, *token;
1047     uint8_t ptag;
1048
1049     p_ptr = getenv("PMI_GNI_PTAG");
1050     CmiAssert(p_ptr != NULL);
1051     token = strtok(p_ptr, ":");
1052     ptag = (uint8_t)atoi(token);
1053     return ptag;
1054         
1055 }
1056
1057 static uint32_t get_cookie(void)
1058 {
1059     uint32_t cookie;
1060     char *p_ptr, *token;
1061
1062     p_ptr = getenv("PMI_GNI_COOKIE");
1063     CmiAssert(p_ptr != NULL);
1064     token = strtok(p_ptr, ":");
1065     cookie = (uint32_t)atoi(token);
1066
1067     return cookie;
1068 }
1069
1070 #if LARGEPAGE
1071
1072 /* directly mmap memory from hugetlbfs for large pages */
1073
1074 #include <sys/stat.h>
1075 #include <fcntl.h>
1076 #include <sys/mman.h>
1077 #include <hugetlbfs.h>
1078
1079 // size must be _tlbpagesize aligned
1080 void *my_get_huge_pages(size_t size)
1081 {
1082     char filename[512];
1083     int fd;
1084     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1085     void *ptr = NULL;
1086
1087     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1088     fd = open(filename, O_RDWR | O_CREAT, mode);
1089     if (fd == -1) {
1090         CmiAbort("my_get_huge_pages: open filed");
1091     }
1092     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1093     if (ptr == MAP_FAILED) ptr = NULL;
1094 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1095     close(fd);
1096     unlink(filename);
1097     return ptr;
1098 }
1099
1100 void my_free_huge_pages(void *ptr, int size)
1101 {
1102 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1103     int ret = munmap(ptr, size);
1104     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1105 }
1106
1107 #endif
1108
1109 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1110 /* TODO: add any that are related */
1111 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1112
1113
1114 #include "machine-lrts.h"
1115 #include "machine-common-core.c"
1116
1117
1118 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1119 static void SendRdmaMsg(PCQueue );
1120 static void PumpNetworkSmsg();
1121 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1122 #if CQWRITE
1123 static void PumpCqWriteTransactions();
1124 #endif
1125 #if REMOTE_EVENT
1126 static void PumpRemoteTransactions(gni_cq_handle_t);
1127 #endif
1128
1129 #if MACHINE_DEBUG_LOG
1130 FILE *debugLog = NULL;
1131 static CmiInt8 buffered_recv_msg = 0;
1132 int         lrts_smsg_success = 0;
1133 int         lrts_received_msg = 0;
1134 #endif
1135
1136 static void sweep_mempool(mempool_type *mptr)
1137 {
1138     int n = 0;
1139     block_header *current = &(mptr->block_head);
1140
1141     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1142     while( current!= NULL) {
1143         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1144         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1145     }
1146     printf("[n %d] sweep_mempool slot END.\n", myrank);
1147 }
1148
1149 inline
1150 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1151 {
1152     block_header *current = *from;
1153
1154     //while(register_memory_size>= MAX_REG_MEM)
1155     //{
1156         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1157             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1158
1159         *from = current;
1160         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1161         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1162         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1163     //}
1164     return GNI_RC_SUCCESS;
1165 }
1166
1167 inline 
1168 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1169 {
1170     gni_return_t status = GNI_RC_SUCCESS;
1171     //int size = GetMempoolsize(msg);
1172     //void *blockaddr = GetMempoolBlockPtr(msg);
1173     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1174    
1175     block_header *current = &(mptr->block_head);
1176     while(register_memory_size>= MAX_REG_MEM)
1177     {
1178         status = deregisterMemory(mptr, &current);
1179         if (status != GNI_RC_SUCCESS) break;
1180     }
1181     if(register_memory_size>= MAX_REG_MEM) return status;
1182
1183     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1184     while(1)
1185     {
1186         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1187         if(status == GNI_RC_SUCCESS)
1188         {
1189             break;
1190         }
1191         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1192         {
1193             GNI_RC_CHECK("registerFromMempool", status);
1194         }
1195         else
1196         {
1197             status = deregisterMemory(mptr, &current);
1198             if (status != GNI_RC_SUCCESS) break;
1199         }
1200     }; 
1201     return status;
1202 }
1203
1204 inline 
1205 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1206 {
1207     static int rank = -1;
1208     int i;
1209     gni_return_t status;
1210     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1211     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1212     mempool_type *mptr;
1213
1214     status = registerFromMempool(mptr1, msg, size, t, cqh);
1215     if (status == GNI_RC_SUCCESS) return status;
1216 #if CMK_SMP 
1217     for (i=0; i<CmiMyNodeSize()+1; i++) {
1218       rank = (rank+1)%(CmiMyNodeSize()+1);
1219       mptr = CpvAccessOther(mempool, rank);
1220       if (mptr == mptr1) continue;
1221       status = registerFromMempool(mptr, msg, size, t, cqh);
1222       if (status == GNI_RC_SUCCESS) return status;
1223     }
1224 #endif
1225     return  GNI_RC_ERROR_RESOURCE;
1226 }
1227
1228 inline
1229 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1230 {
1231     MSG_LIST        *msg_tmp;
1232     MallocMsgList(msg_tmp);
1233     msg_tmp->destNode = destNode;
1234     msg_tmp->size   = size;
1235     msg_tmp->msg    = msg;
1236     msg_tmp->tag    = tag;
1237 #if CMK_WITH_STATS
1238     SMSG_CREATION(msg_tmp)
1239 #endif
1240
1241 #if ONE_SEND_QUEUE
1242     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1243 #else
1244 #if SMP_LOCKS
1245     CmiLock(queue->smsg_msglist_index[destNode].lock);
1246     if(queue->smsg_msglist_index[destNode].pushed == 0)
1247     {
1248         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1249     }
1250     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1251     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1252 #else
1253     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1254 #endif
1255 #endif
1256
1257 #if PRINT_SYH
1258     buffered_smsg_counter++;
1259 #endif
1260 }
1261
1262 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
1263 {
1264     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1265 }
1266
1267 inline
1268 static void setup_smsg_connection(int destNode)
1269 {
1270     mdh_addr_list_t  *new_entry = 0;
1271     gni_post_descriptor_t *pd;
1272     gni_smsg_attr_t      *smsg_attr;
1273     gni_return_t status = GNI_RC_NOT_DONE;
1274     RDMA_REQUEST        *rdma_request_msg;
1275     
1276     if(smsg_available_slot == smsg_expand_slots)
1277     {
1278         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1279         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1280         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1281
1282         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1283             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1284             GNI_MEM_READWRITE,   
1285             -1,
1286             &(new_entry->mdh));
1287         smsg_available_slot = 0; 
1288         new_entry->next = smsg_dynamic_list;
1289         smsg_dynamic_list = new_entry;
1290     }
1291     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1292     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1293     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1294     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1295     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1296     smsg_attr->buff_size = smsg_memlen;
1297     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1298     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1299     smsg_local_attr_vec[destNode] = smsg_attr;
1300     smsg_available_slot++;
1301     MallocPostDesc(pd);
1302     pd->type            = GNI_POST_FMA_PUT;
1303     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1304     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1305     pd->length          = sizeof(gni_smsg_attr_t);
1306     pd->local_addr      = (uint64_t) smsg_attr;
1307     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1308     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1309     pd->src_cq_hndl     = 0;
1310
1311     pd->rdma_mode       = 0;
1312     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1313     print_smsg_attr(smsg_attr);
1314     if(status == GNI_RC_ERROR_RESOURCE )
1315     {
1316         MallocRdmaRequest(rdma_request_msg);
1317         rdma_request_msg->destNode = destNode;
1318         rdma_request_msg->pd = pd;
1319         /* buffer this request */
1320     }
1321 #if PRINT_SYH
1322     if(status != GNI_RC_SUCCESS)
1323        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1324     else
1325         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1326 #endif
1327 }
1328
1329 /* useDynamicSMSG */
1330 inline 
1331 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1332 {
1333     gni_return_t status = GNI_RC_NOT_DONE;
1334
1335     if(mailbox_list->offset == mailbox_list->size)
1336     {
1337         dynamic_smsg_mailbox_t *new_mailbox_entry;
1338         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1339         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1340         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1341         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1342         new_mailbox_entry->offset = 0;
1343         
1344         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1345             new_mailbox_entry->size, smsg_rx_cqh,
1346             GNI_MEM_READWRITE,   
1347             -1,
1348             &(new_mailbox_entry->mem_hndl));
1349
1350         GNI_RC_CHECK("register", status);
1351         new_mailbox_entry->next = mailbox_list;
1352         mailbox_list = new_mailbox_entry;
1353     }
1354     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1355     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1356     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1357     local_smsg_attr->mbox_offset = mailbox_list->offset;
1358     mailbox_list->offset += smsg_memlen;
1359     local_smsg_attr->buff_size = smsg_memlen;
1360     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1361     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1362 }
1363
1364 /* useDynamicSMSG */
1365 inline 
1366 static int connect_to(int destNode)
1367 {
1368     gni_return_t status = GNI_RC_NOT_DONE;
1369     CmiAssert(smsg_connected_flag[destNode] == 0);
1370     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1371     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1372     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1373     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1374     
1375     CMI_GNI_LOCK(global_gni_lock)
1376     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1377     CMI_GNI_UNLOCK(global_gni_lock)
1378     if (status == GNI_RC_ERROR_RESOURCE) {
1379       /* possibly destNode is making connection at the same time */
1380       free(smsg_attr_vector_local[destNode]);
1381       smsg_attr_vector_local[destNode] = NULL;
1382       free(smsg_attr_vector_remote[destNode]);
1383       smsg_attr_vector_remote[destNode] = NULL;
1384       mailbox_list->offset -= smsg_memlen;
1385 #if PRINT_SYH
1386     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1387 #endif
1388       return 0;
1389     }
1390     GNI_RC_CHECK("GNI_Post", status);
1391     smsg_connected_flag[destNode] = 1;
1392 #if PRINT_SYH
1393     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1394 #endif
1395     return 1;
1396 }
1397
1398 inline 
1399 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1400 {
1401     unsigned int          remote_address;
1402     uint32_t              remote_id;
1403     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1404     gni_smsg_attr_t       *smsg_attr;
1405     gni_post_descriptor_t *pd;
1406     gni_post_state_t      post_state;
1407     char                  *real_data; 
1408
1409     if (useDynamicSMSG) {
1410         switch (smsg_connected_flag[destNode]) {
1411         case 0: 
1412             connect_to(destNode);         /* continue to case 1 */
1413         case 1:                           /* pending connection, do nothing */
1414             status = GNI_RC_NOT_DONE;
1415             if(inbuff ==0)
1416                 buffer_small_msgs(queue, msg, size, destNode, tag);
1417             return status;
1418         }
1419     }
1420 #if ! ONE_SEND_QUEUE
1421     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1422 #endif
1423     {
1424         //CMI_GNI_LOCK(smsg_mailbox_lock)
1425         CMI_GNI_LOCK(default_tx_cq_lock)
1426 #if CMK_SMP_TRACE_COMMTHREAD
1427         int oldpe = -1;
1428         int oldeventid = -1;
1429         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1430         { 
1431             START_EVENT();
1432             if ( tag == SMALL_DATA_TAG)
1433                 real_data = (char*)msg; 
1434             else 
1435                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1436             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1437             TRACE_COMM_SET_COMM_MSGID(real_data);
1438         }
1439 #endif
1440 #if REMOTE_EVENT
1441         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
1442             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1443             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1444                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1445         }
1446         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1447 #endif
1448 #if     CMK_WITH_STATS
1449         SMSG_TRY_SEND(tag)
1450 #endif
1451 #if CMK_WITH_STATS
1452     double              creation_time;
1453     if (ptr == NULL)
1454         creation_time = CmiWallTimer();
1455     else
1456         creation_time = ptr->creation_time;
1457 #endif
1458
1459     status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1460 #if CMK_SMP_TRACE_COMMTHREAD
1461         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1462 #endif
1463         CMI_GNI_UNLOCK(default_tx_cq_lock)
1464         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1465         if(status == GNI_RC_SUCCESS)
1466         {
1467 #if     CMK_WITH_STATS
1468             SMSG_SENT_DONE(creation_time,tag) 
1469 #endif
1470 #if CMK_SMP_TRACE_COMMTHREAD
1471             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1472             { 
1473                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1474             }
1475 #endif
1476         }else
1477             status = GNI_RC_ERROR_RESOURCE;
1478     }
1479     if(status != GNI_RC_SUCCESS && inbuff ==0)
1480         buffer_small_msgs(queue, msg, size, destNode, tag);
1481     return status;
1482 }
1483
1484 inline 
1485 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1486 {
1487     /* construct a control message and send */
1488     CONTROL_MSG         *control_msg_tmp;
1489     MallocControlMsg(control_msg_tmp);
1490     control_msg_tmp->source_addr = (uint64_t)msg;
1491     control_msg_tmp->seq_id    = seqno;
1492     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1493 #if REMOTE_EVENT
1494     control_msg_tmp->ack_index    =  -1;
1495 #endif
1496 #if     USE_LRTS_MEMPOOL
1497     if(size < BIG_MSG)
1498     {
1499         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1500     }
1501     else
1502     {
1503         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1504         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1505         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1506     }
1507 #else
1508     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1509 #endif
1510     return control_msg_tmp;
1511 }
1512
1513 #define BLOCKING_SEND_CONTROL    0
1514
1515 // Large message, send control to receiver, receiver register memory and do a GET, 
1516 // return 1 - send no success
1517 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1518 {
1519     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1520     uint32_t            vmdh_index  = -1;
1521     int                 size;
1522     int                 offset = 0;
1523     uint64_t            source_addr;
1524     int                 register_size; 
1525     void                *msg;
1526
1527     size    =   control_msg_tmp->total_length;
1528     source_addr = control_msg_tmp->source_addr;
1529     register_size = control_msg_tmp->length;
1530
1531 #if  USE_LRTS_MEMPOOL
1532     if( control_msg_tmp->seq_id == 0 ){
1533 #if BLOCKING_SEND_CONTROL
1534         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1535             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1536                 LrtsAdvanceCommunication(0);
1537         }
1538 #endif
1539         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1540         {
1541             msg = (void*)source_addr;
1542             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1543             {
1544                 if(!inbuff)
1545                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1546                 return GNI_RC_ERROR_NOMEM;
1547             }
1548             //register the corresponding mempool
1549             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1550             if(status == GNI_RC_SUCCESS)
1551             {
1552                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1553             }
1554         }else
1555         {
1556             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1557             status = GNI_RC_SUCCESS;
1558         }
1559         if(NoMsgInSend(source_addr))
1560             register_size = GetMempoolsize((void*)(source_addr));
1561         else
1562             register_size = 0;
1563     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1564     {
1565         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1566         source_addr += offset;
1567         size = control_msg_tmp->length;
1568 #if BLOCKING_SEND_CONTROL
1569         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1570             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1571                 LrtsAdvanceCommunication(0);
1572         }
1573 #endif
1574         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1575             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1576             {
1577                 if(!inbuff)
1578                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1579                 return GNI_RC_ERROR_NOMEM;
1580             }
1581             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1582             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1583         }
1584         else
1585         {
1586             status = GNI_RC_SUCCESS;
1587         }
1588         register_size = 0;  
1589     }
1590
1591 #if CMI_EXERT_SEND_LARGE_CAP
1592     if(SEND_large_pending >= SEND_large_cap)
1593     {
1594         status = GNI_RC_ERROR_NOMEM;
1595     }
1596 #endif
1597  
1598     if(status == GNI_RC_SUCCESS)
1599     {
1600        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr); 
1601         if(status == GNI_RC_SUCCESS)
1602         {
1603 #if CMI_EXERT_SEND_LARGE_CAP
1604             SEND_large_pending++;
1605 #endif
1606             buffered_send_msg += register_size;
1607             if(control_msg_tmp->seq_id == 0)
1608             {
1609                 IncreaseMsgInSend(source_addr);
1610             }
1611             FreeControlMsg(control_msg_tmp);
1612             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1613         }else
1614             status = GNI_RC_ERROR_RESOURCE;
1615
1616     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1617     {
1618         CmiAbort("Memory registor for large msg\n");
1619     }else 
1620     {
1621         status = GNI_RC_ERROR_NOMEM; 
1622         if(!inbuff)
1623             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1624     }
1625     return status;
1626 #else
1627     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1628     if(status == GNI_RC_SUCCESS)
1629     {
1630         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);  
1631         if(status == GNI_RC_SUCCESS)
1632         {
1633             FreeControlMsg(control_msg_tmp);
1634         }
1635     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1636     {
1637         CmiAbort("Memory registor for large msg\n");
1638     }else 
1639     {
1640         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1641     }
1642     return status;
1643 #endif
1644 }
1645
1646 inline void LrtsPrepareEnvelope(char *msg, int size)
1647 {
1648     CmiSetMsgSize(msg, size);
1649     CMI_SET_CHECKSUM(msg, size);
1650 }
1651
1652 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1653 {
1654     gni_return_t        status  =   GNI_RC_SUCCESS;
1655     uint8_t tag;
1656     CONTROL_MSG         *control_msg_tmp;
1657     int                 oob = ( mode & OUT_OF_BAND);
1658     SMSG_QUEUE          *queue;
1659
1660     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1661 #if CMK_USE_OOB
1662     queue = oob? &smsg_oob_queue : &smsg_queue;
1663     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1664 #else
1665     queue = &smsg_queue;
1666     tag = LMSG_INIT_TAG;
1667 #endif
1668
1669     LrtsPrepareEnvelope(msg, size);
1670
1671 #if PRINT_SYH
1672     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1673 #endif 
1674
1675 #if CMK_SMP 
1676     if(size <= SMSG_MAX_MSG)
1677         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1678     else if (size < BIG_MSG) {
1679         control_msg_tmp =  construct_control_msg(size, msg, 0);
1680         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1681     }
1682     else {
1683         CmiSetMsgSeq(msg, 0);
1684         control_msg_tmp =  construct_control_msg(size, msg, 1);
1685         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1686     }
1687 #else   //non-smp, smp(worker sending)
1688     if(size <= SMSG_MAX_MSG)
1689     {
1690         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL))
1691             CmiFree(msg);
1692     }
1693     else if (size < BIG_MSG) {
1694         control_msg_tmp =  construct_control_msg(size, msg, 0);
1695         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1696     }
1697     else {
1698 #if     USE_LRTS_MEMPOOL
1699         CmiSetMsgSeq(msg, 0);
1700         control_msg_tmp =  construct_control_msg(size, msg, 1);
1701         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1702 #else
1703         control_msg_tmp =  construct_control_msg(size, msg, 0);
1704         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1705 #endif
1706     }
1707 #endif
1708     return 0;
1709 }
1710
1711 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1712 {
1713   int i;
1714 #if CMK_BROADCAST_USE_CMIREFERENCE
1715   for(i=0;i<npes;i++) {
1716     if (pes[i] == CmiMyPe())
1717       CmiSyncSend(pes[i], len, msg);
1718     else {
1719       CmiReference(msg);
1720       CmiSyncSendAndFree(pes[i], len, msg);
1721     }
1722   }
1723 #else
1724   for(i=0;i<npes;i++) {
1725     CmiSyncSend(pes[i], len, msg);
1726   }
1727 #endif
1728 }
1729
1730 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1731 {
1732   /* A better asynchronous implementation may be wanted, but at least it works */
1733   CmiSyncListSendFn(npes, pes, len, msg);
1734   return (CmiCommHandle) 0;
1735 }
1736
1737 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1738 {
1739   if (npes == 1) {
1740       CmiSyncSendAndFree(pes[0], len, msg);
1741       return;
1742   }
1743 #if CMK_PERSISTENT_COMM
1744   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1745 #if CMK_SMP
1746             && IS_PERSISTENT_MEMORY(msg)
1747 #endif
1748      ){
1749       int i;
1750       for(i=0;i<npes;i++) {
1751         if (pes[i] == CmiMyPe())
1752           CmiSyncSend(pes[i], len, msg);
1753         else {
1754           CmiReference(msg);
1755           CmiSyncSendAndFree(pes[i], len, msg);
1756         }
1757       }
1758       CmiFree(msg);
1759       return;
1760   }
1761 #endif
1762   
1763 #if CMK_BROADCAST_USE_CMIREFERENCE
1764   CmiSyncListSendFn(npes, pes, len, msg);
1765   CmiFree(msg);
1766 #else
1767   int i;
1768   for(i=0;i<npes-1;i++) {
1769     CmiSyncSend(pes[i], len, msg);
1770   }
1771   if (npes>0)
1772     CmiSyncSendAndFree(pes[npes-1], len, msg);
1773   else 
1774     CmiFree(msg);
1775 #endif
1776 }
1777
1778 static void    PumpDatagramConnection();
1779 static      int         event_SetupConnect = 111;
1780 static      int         event_PumpSmsg = 222 ;
1781 static      int         event_PumpTransaction = 333;
1782 static      int         event_PumpRdmaTransaction = 444;
1783 static      int         event_SendBufferSmsg = 484;
1784 static      int         event_SendFmaRdmaMsg = 555;
1785 static      int         event_AdvanceCommunication = 666;
1786
1787 static void registerUserTraceEvents() {
1788 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1789     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1790     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1791     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1792     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1793     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1794     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1795     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1796 #endif
1797 }
1798
1799 static void ProcessDeadlock()
1800 {
1801     static CmiUInt8 *ptr = NULL;
1802     static CmiUInt8  last = 0, mysum, sum;
1803     static int count = 0;
1804     gni_return_t status;
1805     int i;
1806
1807 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1808 //sweep_mempool(CpvAccess(mempool));
1809     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1810     mysum = smsg_send_count + smsg_recv_count;
1811     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1812     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1813     GNI_RC_CHECK("PMI_Allgather", status);
1814     sum = 0;
1815     for (i=0; i<mysize; i++)  sum+= ptr[i];
1816     if (last == 0 || sum == last) 
1817         count++;
1818     else
1819         count = 0;
1820     last = sum;
1821     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1822     if (count == 2) { 
1823         /* detected twice, it is a real deadlock */
1824         if (myrank == 0)  {
1825             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1826             CmiAbort("Fatal> Deadlock detected.");
1827         }
1828
1829     }
1830     _detected_hang = 0;
1831 }
1832
1833 static void CheckProgress()
1834 {
1835     if (smsg_send_count == last_smsg_send_count &&
1836         smsg_recv_count == last_smsg_recv_count ) 
1837     {
1838         _detected_hang = 1;
1839 #if !CMK_SMP
1840         if (_detected_hang) ProcessDeadlock();
1841 #endif
1842
1843     }
1844     else {
1845         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1846         last_smsg_send_count = smsg_send_count;
1847         last_smsg_recv_count = smsg_recv_count;
1848         _detected_hang = 0;
1849     }
1850 }
1851
1852 static void set_limit()
1853 {
1854     //if (!user_set_flag && CmiMyRank() == 0) {
1855     if (CmiMyRank() == 0) {
1856         int mynode = CmiPhysicalNodeID(CmiMyPe());
1857         int numpes = CmiNumPesOnPhysicalNode(mynode);
1858         int numprocesses = numpes / CmiMyNodeSize();
1859         MAX_REG_MEM  = _totalmem / numprocesses;
1860         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1861         if (CmiMyPe() == 0)
1862            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1863         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1864         {
1865              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1866              CmiAbort("memory registration\n");
1867         }
1868     }
1869 }
1870
1871 void LrtsPostCommonInit(int everReturn)
1872 {
1873 #if CMK_DIRECT
1874     CmiDirectInit();
1875 #endif
1876 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1877     CpvInitialize(double, projTraceStart);
1878     /* only PE 0 needs to care about registration (to generate sts file). */
1879     //if (CmiMyPe() == 0) 
1880     {
1881         registerMachineUserEventsFunction(&registerUserTraceEvents);
1882     }
1883 #endif
1884
1885 #if CMK_SMP
1886     CmiIdleState *s=CmiNotifyGetState();
1887     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1888     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1889 #else
1890     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1891     if (useDynamicSMSG)
1892     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1893 #endif
1894
1895 #if ! LARGEPAGE
1896     if (_checkProgress)
1897 #if CMK_SMP
1898     if (CmiMyRank() == 0)
1899 #endif
1900     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1901 #endif
1902  
1903 #if !LARGEPAGE
1904     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1905 #endif
1906 }
1907
1908 /* this is called by worker thread */
1909 void LrtsPostNonLocal()
1910 {
1911 #if 1
1912
1913 #if CMK_SMP_TRACE_COMMTHREAD
1914     double startT, endT;
1915 #endif
1916
1917 #if MULTI_THREAD_SEND
1918     if(mysize == 1) return;
1919
1920     if (CmiMyRank() % 6 != 3) return;
1921
1922 #if CMK_SMP_TRACE_COMMTHREAD
1923     traceEndIdle();
1924     startT = CmiWallTimer();
1925 #endif
1926
1927     CmiMachineProgressImpl();
1928
1929 #if CMK_SMP_TRACE_COMMTHREAD
1930     endT = CmiWallTimer();
1931     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1932     traceBeginIdle();
1933 #endif
1934
1935 #endif
1936 #endif
1937 }
1938
1939 /* Network progress function is used to poll the network when for
1940    messages. This flushes receive buffers on some  implementations*/
1941 #if CMK_MACHINE_PROGRESS_DEFINED
1942 void CmiMachineProgressImpl() {
1943 #if ! CMK_SMP || MULTI_THREAD_SEND
1944
1945     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1946     SEND_OOB_SMSG(smsg_oob_queue)
1947     PUMP_REMOTE_HIGHPRIORITY
1948     PUMP_LOCAL_HIGHPRIORITY
1949     POST_HIGHPRIORITY_RDMA
1950
1951 #if 0
1952 #if CMK_WORKER_SINGLE_TASK
1953     if (CmiMyRank() % 6 == 0)
1954 #endif
1955     PumpNetworkSmsg();
1956
1957 #if CMK_WORKER_SINGLE_TASK
1958     if (CmiMyRank() % 6 == 1)
1959 #endif
1960     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1961
1962 #if CMK_WORKER_SINGLE_TASK
1963     if (CmiMyRank() % 6 == 2)
1964 #endif
1965     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1966
1967 #if REMOTE_EVENT
1968 #if CMK_WORKER_SINGLE_TASK
1969     if (CmiMyRank() % 6 == 3)
1970 #endif
1971     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
1972 #endif
1973
1974 #if CMK_WORKER_SINGLE_TASK
1975     if (CmiMyRank() % 6 == 4)
1976 #endif
1977     {
1978 #if CMK_USE_OOB
1979     SendBufferMsg(&smsg_oob_queue, NULL);
1980     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
1981 #else
1982     SendBufferMsg(&smsg_queue, NULL);
1983 #endif
1984     }
1985
1986 #if CMK_WORKER_SINGLE_TASK
1987     if (CmiMyRank() % 6 == 5)
1988 #endif
1989 #if CMK_SMP
1990     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
1991 #else
1992     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
1993 #endif
1994
1995 #endif
1996 #endif
1997 }
1998 #endif
1999
2000
2001 /* useDynamicSMSG */
2002 static void    PumpDatagramConnection()
2003 {
2004     uint32_t          remote_address;
2005     uint32_t          remote_id;
2006     gni_return_t status;
2007     gni_post_state_t  post_state;
2008     uint64_t          datagram_id;
2009     int i;
2010
2011    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2012    {
2013        if (datagram_id >= mysize) {           /* bound endpoint */
2014            int pe = datagram_id - mysize;
2015            CMI_GNI_LOCK(global_gni_lock)
2016            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2017            CMI_GNI_UNLOCK(global_gni_lock)
2018            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2019            {
2020                CmiAssert(remote_id == pe);
2021                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2022                GNI_RC_CHECK("Dynamic SMSG Init", status);
2023 #if PRINT_SYH
2024                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2025 #endif
2026                CmiAssert(smsg_connected_flag[pe] == 1);
2027                smsg_connected_flag[pe] = 2;
2028            }
2029        }
2030        else {         /* unbound ep */
2031            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2032            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2033            {
2034                CmiAssert(remote_id<mysize);
2035                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2036                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2037                GNI_RC_CHECK("Dynamic SMSG Init", status);
2038 #if PRINT_SYH
2039                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2040 #endif
2041                smsg_connected_flag[remote_id] = 2;
2042
2043                alloc_smsg_attr(&send_smsg_attr);
2044                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2045                GNI_RC_CHECK("post unbound datagram", status);
2046            }
2047        }
2048    }
2049 }
2050
2051 /* pooling CQ to receive network message */
2052 static void PumpNetworkRdmaMsgs()
2053 {
2054     gni_cq_entry_t      event_data;
2055     gni_return_t        status;
2056
2057 }
2058
2059 inline 
2060 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2061 {
2062     RDMA_REQUEST        *rdma_request_msg;
2063     MallocRdmaRequest(rdma_request_msg);
2064     rdma_request_msg->destNode = inst_id;
2065     rdma_request_msg->pd = pd;
2066 #if REMOTE_EVENT
2067     rdma_request_msg->ack_index = ack_index;
2068 #endif
2069     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2070 }
2071
2072 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2073
2074 static void PumpNetworkSmsg()
2075 {
2076     uint64_t            inst_id;
2077     gni_cq_entry_t      event_data;
2078     gni_return_t        status;
2079     void                *header;
2080     uint8_t             msg_tag;
2081     int                 msg_nbytes;
2082     void                *msg_data;
2083     gni_mem_handle_t    msg_mem_hndl;
2084     gni_smsg_attr_t     *smsg_attr;
2085     gni_smsg_attr_t     *remote_smsg_attr;
2086     int                 init_flag;
2087     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2088     uint64_t            source_addr;
2089     SMSG_QUEUE         *queue = &smsg_queue;
2090     PCQueue             tmp_queue;
2091 #if  CMK_DIRECT
2092     cmidirectMsg        *direct_msg;
2093 #endif
2094 #if CMI_PUMPNETWORKSMSG_CAP 
2095     int                  recv_cnt = 0;
2096     while(recv_cnt< PumpNetworkSmsg_cap) {
2097 #else
2098     while(1) {
2099 #endif
2100         CMI_GNI_LOCK(smsg_rx_cq_lock)
2101         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2102         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2103         if(status != GNI_RC_SUCCESS) break;
2104
2105         inst_id = GNI_CQ_GET_INST_ID(event_data);
2106 #if REMOTE_EVENT
2107         inst_id = GET_RANK(inst_id);      /* important */
2108 #endif
2109         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2110 #if PRINT_SYH
2111         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2112 #endif
2113         if (useDynamicSMSG) {
2114             /* subtle: smsg may come before connection is setup */
2115             while (smsg_connected_flag[inst_id] != 2) 
2116                PumpDatagramConnection();
2117         }
2118         msg_tag = GNI_SMSG_ANY_TAG;
2119         while(1) {
2120             CMI_GNI_LOCK(smsg_mailbox_lock)
2121             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2122             if (status != GNI_RC_SUCCESS)
2123             {
2124                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2125                 break;
2126             }
2127 #if         CMI_PUMPNETWORKSMSG_CAP
2128             recv_cnt++; 
2129 #endif
2130 #if PRINT_SYH
2131             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2132 #endif
2133             /* copy msg out and then put into queue (small message) */
2134             switch (msg_tag) {
2135             case SMALL_DATA_TAG:
2136             {
2137                 START_EVENT();
2138                 msg_nbytes = CmiGetMsgSize(header);
2139                 msg_data    = CmiAlloc(msg_nbytes);
2140                 memcpy(msg_data, (char*)header, msg_nbytes);
2141                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2142                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2143                 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2144                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2145                 handleOneRecvedMsg(msg_nbytes, msg_data);
2146                 break;
2147             }
2148             case LMSG_INIT_TAG:
2149             case LMSG_OOB_INIT_TAG:
2150             {
2151                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2152 #if MULTI_THREAD_SEND
2153                 MallocControlMsg(control_msg_tmp);
2154                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2155                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2156                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2157                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2158                 FreeControlMsg(control_msg_tmp);
2159 #else
2160                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2161                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2162                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2163 #endif
2164                 break;
2165             }
2166 #if !REMOTE_EVENT && !CQWRITE
2167             case ACK_TAG:   //msg fit into mempool
2168             {
2169                 /* Get is done, release message . Now put is not used yet*/
2170                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2171                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2172                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2173 #if ! USE_LRTS_MEMPOOL
2174                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2175 #else
2176                 DecreaseMsgInSend(msg);
2177 #endif
2178                 if(NoMsgInSend(msg))
2179                     buffered_send_msg -= GetMempoolsize(msg);
2180                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2181                 CmiFree(msg);
2182 #if CMI_EXERT_SEND_LARGE_CAP
2183                 SEND_large_pending--;
2184 #endif
2185                 break;
2186             }
2187 #endif
2188             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2189             {
2190 #if MULTI_THREAD_SEND
2191                 MallocControlMsg(header_tmp);
2192                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2193                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2194 #else
2195                 header_tmp = (CONTROL_MSG *) header;
2196 #endif
2197                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2198 #if CMI_EXERT_SEND_LARGE_CAP
2199                 SEND_large_pending--;
2200 #endif
2201                 void *msg = (void*)(header_tmp->source_addr);
2202                 int cur_seq = CmiGetMsgSeq(msg);
2203                 int offset = ONE_SEG*(cur_seq+1);
2204                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2205                 buffered_send_msg -= header_tmp->length;
2206                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2207                 if (remain_size < 0) remain_size = 0;
2208                 CmiSetMsgSize(msg, remain_size);
2209                 if(remain_size <= 0) //transaction done
2210                 {
2211                     CmiFree(msg);
2212                 }else if (header_tmp->total_length > offset)
2213                 {
2214                     CmiSetMsgSeq(msg, cur_seq+1);
2215                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2216                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2217                     //send next seg
2218                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2219                          // pipelining
2220                     if (header_tmp->seq_id == 1) {
2221                       int i;
2222                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2223                         int seq = cur_seq+i+2;
2224                         CmiSetMsgSeq(msg, seq-1);
2225                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2226                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2227                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2228                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2229                       }
2230                     }
2231                 }
2232 #if MULTI_THREAD_SEND
2233                 FreeControlMsg(header_tmp);
2234 #else
2235                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2236 #endif
2237                 break;
2238             }
2239 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2240             case PUT_DONE_TAG:  {   //persistent message
2241                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2242                 int size = ((CONTROL_MSG *) header)->length;
2243                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2244                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2245                 CmiReference(msg);
2246                 CMI_CHECK_CHECKSUM(msg, size);
2247                 handleOneRecvedMsg(size, msg); 
2248 #if PRINT_SYH
2249                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2250 #endif
2251                 break;
2252             }
2253 #endif
2254 #if CMK_DIRECT
2255             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2256                 //create a trigger message
2257                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2258                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2259                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2260                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2261                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2262                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2263                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2264                 break;
2265 #endif
2266             default:
2267                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2268                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2269                 printf("weird tag problem\n");
2270                 CmiAbort("Unknown tag\n");
2271             }               // end switch
2272 #if PRINT_SYH
2273             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2274 #endif
2275             smsg_recv_count ++;
2276             msg_tag = GNI_SMSG_ANY_TAG;
2277         } //endwhile GNI_SmsgGetNextWTag
2278     }   //end while GetEvent
2279     if(status == GNI_RC_ERROR_RESOURCE)
2280     {
2281         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
2282         GNI_RC_CHECK("Smsg_rx_cq full", status);
2283     }
2284 }
2285
2286 static void printDesc(gni_post_descriptor_t *pd)
2287 {
2288     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2289 }
2290
2291 #if CQWRITE
2292 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2293 {
2294     gni_post_descriptor_t *pd;
2295     gni_return_t        status = GNI_RC_SUCCESS;
2296     
2297     MallocPostDesc(pd);
2298     pd->type = GNI_POST_CQWRITE;
2299     pd->cq_mode = GNI_CQMODE_SILENT;
2300     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2301     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2302     pd->cqwrite_value = data;
2303     pd->remote_mem_hndl = mem_hndl;
2304     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2305     GNI_RC_CHECK("GNI_PostCqWrite", status);
2306 }
2307 #endif
2308
2309 // register memory for a message
2310 // return mem handle
2311 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2312 {
2313     gni_return_t status = GNI_RC_SUCCESS;
2314
2315     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2316
2317 #if CMK_PERSISTENT_COMM
2318       // persistent message is always registered
2319       // BIG_MSG small pieces do not have malloc chunk header
2320     if (IS_PERSISTENT_MEMORY(msg)) {
2321         *memh = GetMemHndl(msg);
2322         return GNI_RC_SUCCESS;
2323     }
2324 #endif
2325     if(seqno == 0 
2326 #if CMK_PERSISTENT_COMM
2327          || seqno == PERSIST_SEQ
2328 #endif
2329       )
2330     {
2331         if(IsMemHndlZero((GetMemHndl(msg))))
2332         {
2333             msg = (void*)(msg);
2334             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2335             if(status == GNI_RC_SUCCESS)
2336                 *memh = GetMemHndl(msg);
2337         }
2338         else {
2339             *memh = GetMemHndl(msg);
2340         }
2341     }
2342     else {
2343         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2344         status = registerMemory(msg, size, memh, NULL); 
2345     }
2346     return status;
2347 }
2348
2349 // for BIG_MSG called on receiver side for receiving control message
2350 // LMSG_INIT_TAG
2351 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2352 {
2353 #if     USE_LRTS_MEMPOOL
2354     CONTROL_MSG         *request_msg;
2355     gni_return_t        status = GNI_RC_SUCCESS;
2356     void                *msg_data;
2357     gni_post_descriptor_t *pd;
2358     gni_mem_handle_t    msg_mem_hndl;
2359     int                 size, transaction_size, offset = 0;
2360     size_t              register_size = 0;
2361
2362     // initial a get to transfer data from the sender side */
2363     request_msg = (CONTROL_MSG *) header;
2364     size = request_msg->total_length;
2365     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2366     MallocPostDesc(pd);
2367 #if CMK_WITH_STATS 
2368     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2369 #endif
2370     if(request_msg->seq_id < 2)   {
2371 #if CMK_SMP_TRACE_COMMTHREAD 
2372         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2373 #endif
2374         msg_data = CmiAlloc(size);
2375         CmiSetMsgSeq(msg_data, 0);
2376         _MEMCHECK(msg_data);
2377     }
2378     else {
2379         offset = ONE_SEG*(request_msg->seq_id-1);
2380         msg_data = (char*)request_msg->dest_addr + offset;
2381     }
2382    
2383     pd->cqwrite_value = request_msg->seq_id;
2384
2385     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2386     SetMemHndlZero(pd->local_mem_hndl);
2387     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2388     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2389         if(NoMsgInRecv( (void*)(msg_data)))
2390             register_size = GetMempoolsize((void*)(msg_data));
2391     }
2392
2393     pd->first_operand = ALIGN64(size);                   //  total length
2394
2395     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2396         pd->type            = GNI_POST_FMA_GET;
2397     else
2398         pd->type            = GNI_POST_RDMA_GET;
2399     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2400     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2401     pd->length          = transaction_size;
2402     pd->local_addr      = (uint64_t) msg_data;
2403     pd->remote_addr     = request_msg->source_addr + offset;
2404     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2405
2406     if (tag == LMSG_OOB_INIT_TAG) 
2407         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2408     else
2409     {
2410 #if MULTI_THREAD_SEND
2411         pd->src_cq_hndl     = rdma_tx_cqh;
2412 #else
2413         pd->src_cq_hndl     = 0;
2414 #endif
2415     }
2416
2417     pd->rdma_mode       = 0;
2418     pd->amo_cmd         = 0;
2419 #if CMI_EXERT_RECV_RDMA_CAP
2420     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2421 #endif
2422     //memory registration success
2423     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2424     {
2425         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2426         CMI_GNI_LOCK(lock)
2427 #if REMOTE_EVENT
2428         if( request_msg->seq_id == 0)
2429         {
2430             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2431             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2432             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2433         }
2434 #endif
2435
2436 #if CMK_WITH_STATS
2437         RDMA_TRY_SEND(pd->type)
2438 #endif
2439         if(pd->type == GNI_POST_RDMA_GET) 
2440         {
2441             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2442         }
2443         else
2444         {
2445             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2446         }
2447         CMI_GNI_UNLOCK(lock)
2448
2449         if(status == GNI_RC_SUCCESS )
2450         {
2451 #if CMI_EXERT_RECV_RDMA_CAP
2452             RDMA_pending++;
2453 #endif
2454             if(pd->cqwrite_value == 0)
2455             {
2456 #if MACHINE_DEBUG_LOG
2457                 buffered_recv_msg += register_size;
2458                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2459 #endif
2460                 IncreaseMsgInRecv(msg_data);
2461 #if CMK_SMP_TRACE_COMMTHREAD 
2462                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2463 #endif
2464             }
2465 #if  CMK_WITH_STATS
2466             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2467             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2468 #endif
2469         }
2470     }else if (status != GNI_RC_SUCCESS)
2471     {
2472         SetMemHndlZero((pd->local_mem_hndl));
2473     }
2474         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2475     {
2476 #if REMOTE_EVENT
2477         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2478 #else
2479         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2480 #endif
2481     }else if (status != GNI_RC_SUCCESS) {
2482         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2483         GNI_RC_CHECK("GetLargeAFter posting", status);
2484     }
2485 #else
2486     CONTROL_MSG         *request_msg;
2487     gni_return_t        status;
2488     void                *msg_data;
2489     gni_post_descriptor_t *pd;
2490     RDMA_REQUEST        *rdma_request_msg;
2491     gni_mem_handle_t    msg_mem_hndl;
2492     //int source;
2493     // initial a get to transfer data from the sender side */
2494     request_msg = (CONTROL_MSG *) header;
2495     msg_data = CmiAlloc(request_msg->length);
2496     _MEMCHECK(msg_data);
2497
2498     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2499
2500     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2501     {
2502         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2503     }
2504
2505     MallocPostDesc(pd);
2506     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2507         pd->type            = GNI_POST_FMA_GET;
2508     else
2509         pd->type            = GNI_POST_RDMA_GET;
2510     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2511     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2512     pd->length          = ALIGN64(request_msg->length);
2513     pd->local_addr      = (uint64_t) msg_data;
2514     pd->remote_addr     = request_msg->source_addr;
2515     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2516     if (tag == LMSG_OOB_INIT_TAG) 
2517         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2518     else
2519     {
2520 #if MULTI_THREAD_SEND
2521         pd->src_cq_hndl     = rdma_tx_cqh;
2522 #else
2523         pd->src_cq_hndl     = 0;
2524 #endif
2525     }
2526     pd->rdma_mode       = 0;
2527     pd->amo_cmd         = 0;
2528
2529     //memory registration successful
2530     if(status == GNI_RC_SUCCESS)
2531     {
2532         pd->local_mem_hndl  = msg_mem_hndl;
2533        
2534         if(pd->type == GNI_POST_RDMA_GET) 
2535         {
2536             CMI_GNI_LOCK(rdma_tx_cq_lock)
2537             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2538             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2539         }
2540         else
2541         {
2542             CMI_GNI_LOCK(default_tx_cq_lock)
2543             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2544             CMI_GNI_UNLOCK(default_tx_cq_lock)
2545         }
2546
2547     }else
2548     {
2549         SetMemHndlZero(pd->local_mem_hndl);
2550     }
2551     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2552     {
2553         MallocRdmaRequest(rdma_request_msg);
2554         rdma_request_msg->next = 0;
2555         rdma_request_msg->destNode = inst_id;
2556         rdma_request_msg->pd = pd;
2557         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2558     }else {
2559         GNI_RC_CHECK("AFter posting", status);
2560     }
2561 #endif
2562 }
2563
2564 #if CQWRITE
2565 static void PumpCqWriteTransactions()
2566 {
2567
2568     gni_cq_entry_t          ev;
2569     gni_return_t            status;
2570     void                    *msg;  
2571     int                     msg_size;
2572     while(1) {
2573         //CMI_GNI_LOCK(my_cq_lock) 
2574         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2575         //CMI_GNI_UNLOCK(my_cq_lock)
2576         if(status != GNI_RC_SUCCESS) break;
2577         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2578 #if CMK_PERSISTENT_COMM
2579 #if PRINT_SYH
2580         printf(" %d CQ write event %p\n", myrank, msg);
2581 #endif
2582         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2583 #if PRINT_SYH
2584             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2585 #endif
2586             CmiReference(msg);
2587             msg_size = CmiGetMsgSize(msg);
2588             CMI_CHECK_CHECKSUM(msg, msg_size);
2589             handleOneRecvedMsg(msg_size, msg); 
2590             continue;
2591         }
2592 #endif
2593 #if ! USE_LRTS_MEMPOOL
2594        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2595 #else
2596         DecreaseMsgInSend(msg);
2597 #endif
2598         if(NoMsgInSend(msg))
2599             buffered_send_msg -= GetMempoolsize(msg);
2600         CmiFree(msg);
2601     };
2602     if(status == GNI_RC_ERROR_RESOURCE)
2603     {
2604         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2605     }
2606 }
2607 #endif
2608
2609 #if REMOTE_EVENT
2610 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2611 {
2612     gni_cq_entry_t          ev;
2613     gni_return_t            status;
2614     void                    *msg;   
2615     int                     inst_id, index, type, size;
2616
2617 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2618     int                     pump_count = 0;
2619 #endif
2620     while(1) {
2621 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2622         if (pump_count > PumpRemoteTransactions_cap) break;
2623 #endif
2624         CMI_GNI_LOCK(global_gni_lock)
2625 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2626         status = GNI_CqGetEvent(rx_cqh, &ev);
2627 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2628         CMI_GNI_UNLOCK(global_gni_lock)
2629
2630         if(status != GNI_RC_SUCCESS) break;
2631
2632 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2633         pump_count ++;
2634 #endif
2635
2636         inst_id = GNI_CQ_GET_INST_ID(ev);
2637         index = GET_INDEX(inst_id);
2638         type = GET_TYPE(inst_id);
2639         switch (type) {
2640         case 0:    // ACK
2641             CmiAssert(index>=0 && index<ackPool.size);
2642             CMI_GNI_LOCK(ackPool.lock);
2643             CmiAssert(GetIndexType(ackPool, index) == 1);
2644             msg = GetIndexAddress(ackPool, index);
2645             CMI_GNI_UNLOCK(ackPool.lock);
2646 #if PRINT_SYH
2647             printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2648 #endif
2649 #if ! USE_LRTS_MEMPOOL
2650            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2651 #else
2652             DecreaseMsgInSend(msg);
2653 #endif
2654             if(NoMsgInSend(msg))
2655                 buffered_send_msg -= GetMempoolsize(msg);
2656             CmiFree(msg);
2657             IndexPool_freeslot(&ackPool, index);
2658 #if CMI_EXERT_SEND_LARGE_CAP
2659             SEND_large_pending--;
2660 #endif
2661             break;
2662 #if CMK_PERSISTENT_COMM
2663         case 1:  {    // PERSISTENT
2664             CmiLock(persistPool.lock);
2665             CmiAssert(GetIndexType(persistPool, index) == 2);
2666             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2667             CmiUnlock(persistPool.lock);
2668             START_EVENT();
2669             msg = slot->destBuf[0].destAddress;
2670             size = CmiGetMsgSize(msg);
2671             CmiReference(msg);
2672             CMI_CHECK_CHECKSUM(msg, size);
2673             TRACE_COMM_CREATION(EVENT_TIME(), msg);
2674             handleOneRecvedMsg(size, msg); 
2675             break;
2676             }
2677 #endif
2678         default:
2679             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2680             CmiAbort("PumpRemoteTransactions: unknown type");
2681         }
2682     }
2683     if(status == GNI_RC_ERROR_RESOURCE)
2684     {
2685         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2686     }
2687 }
2688 #endif
2689
2690 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2691 {
2692     gni_cq_entry_t          ev;
2693     gni_return_t            status;
2694     uint64_t                type, inst_id;
2695     gni_post_descriptor_t   *tmp_pd;
2696     MSG_LIST                *ptr;
2697     CONTROL_MSG             *ack_msg_tmp;
2698     ACK_MSG                 *ack_msg;
2699     uint8_t                 msg_tag;
2700 #if CMK_DIRECT
2701     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
2702 #endif
2703     SMSG_QUEUE         *queue = &smsg_queue;
2704 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2705     int         pump_count = 0;
2706     while(pump_count < PumpLocalTransactions_cap) {
2707         pump_count++;
2708 #else
2709     while(1) {
2710 #endif
2711         CMI_GNI_LOCK(my_cq_lock) 
2712         status = GNI_CqGetEvent(my_tx_cqh, &ev);
2713         CMI_GNI_UNLOCK(my_cq_lock)
2714         if(status != GNI_RC_SUCCESS) break;
2715         
2716         type = GNI_CQ_GET_TYPE(ev);
2717         if (type == GNI_CQ_EVENT_TYPE_POST)
2718         {
2719
2720 #if CMI_EXERT_RECV_RDMA_CAP
2721             if(RDMA_pending <=0) CmiAbort(" pending error\n");
2722             RDMA_pending--;
2723 #endif
2724             inst_id     = GNI_CQ_GET_INST_ID(ev);
2725 #if PRINT_SYH
2726             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
2727 #endif
2728             CMI_GNI_LOCK(my_cq_lock)
2729             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2730             CMI_GNI_UNLOCK(my_cq_lock)
2731
2732             switch (tmp_pd->type) {
2733 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2734             case GNI_POST_RDMA_PUT:
2735 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2736                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2737 #endif
2738             case GNI_POST_FMA_PUT:
2739                 if(tmp_pd->amo_cmd == 1) {
2740 #if CMK_DIRECT
2741                     //sender ACK to receiver to trigger it is done
2742                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2743                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2744                     msg_tag = DIRECT_PUT_DONE_TAG;
2745 #endif
2746                 }
2747                 else {
2748                     CmiFree((void *)tmp_pd->local_addr);
2749 #if REMOTE_EVENT
2750                     FreePostDesc(tmp_pd);
2751                     continue;
2752 #elif CQWRITE
2753                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2754                     FreePostDesc(tmp_pd);
2755                     continue;
2756 #else
2757                     MallocControlMsg(ack_msg_tmp);
2758                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2759                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2760                     ack_msg_tmp->length  = tmp_pd->length;
2761                     msg_tag = PUT_DONE_TAG;
2762 #endif
2763                 }
2764                 break;
2765 #endif
2766             case GNI_POST_RDMA_GET:
2767             case GNI_POST_FMA_GET:  {
2768 #if  ! USE_LRTS_MEMPOOL
2769                 MallocControlMsg(ack_msg_tmp);
2770                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2771                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2772                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2773                 msg_tag = ACK_TAG;  
2774 #else
2775 #if CMK_WITH_STATS
2776                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2777 #endif
2778                 int seq_id = tmp_pd->cqwrite_value;
2779                 if(seq_id > 0)      // BIG_MSG
2780                 {
2781                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2782                     MallocControlMsg(ack_msg_tmp);
2783                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2784                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
2785                     ack_msg_tmp->seq_id = seq_id;
2786                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2787                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2788                     ack_msg_tmp->length = tmp_pd->length;
2789                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
2790                     msg_tag = BIG_MSG_TAG; 
2791                 } 
2792                 else
2793                 {
2794                     msg_tag = ACK_TAG; 
2795 #if  !REMOTE_EVENT && !CQWRITE
2796                     MallocAckMsg(ack_msg);
2797                     ack_msg->source_addr = tmp_pd->remote_addr;
2798 #endif
2799                 }
2800 #endif
2801                 break;
2802             }
2803             case  GNI_POST_CQWRITE:
2804                    FreePostDesc(tmp_pd);
2805                    continue;
2806             default:
2807                 CmiPrintf("type=%d\n", tmp_pd->type);
2808                 CmiAbort("PumpLocalTransactions: unknown type!");
2809             }      /* end of switch */
2810
2811 #if CMK_DIRECT
2812             if (tmp_pd->amo_cmd == 1) {
2813                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL); 
2814                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
2815             }
2816             else
2817 #endif
2818             if (msg_tag == ACK_TAG) {
2819 #if !REMOTE_EVENT
2820 #if   !CQWRITE
2821                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL); 
2822                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2823 #else
2824                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
2825 #endif
2826 #endif
2827             }
2828             else {
2829                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL); 
2830                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2831             }
2832 #if CMK_PERSISTENT_COMM
2833             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2834 #endif
2835             {
2836                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
2837 #if PRINT_SYH
2838                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2839 #endif
2840                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr); 
2841                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr); 
2842
2843                     START_EVENT();
2844                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2845                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2846 #if MACHINE_DEBUG_LOG
2847                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2848                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2849                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2850 #endif
2851                     TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2852                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2853                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
2854                 }else if(msg_tag == BIG_MSG_TAG){
2855                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2856                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2857                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2858                         START_EVENT();
2859 #if PRINT_SYH
2860                         printf("Pipeline msg done [%d]\n", myrank);
2861 #endif
2862 #if     CMK_SMP_TRACE_COMMTHREAD
2863                         if( tmp_pd->cqwrite_value == 1)
2864                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr); 
2865 #endif
2866                         TRACE_COMM_CREATION(EVENT_TIME(), msg);
2867                         CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2868                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
2869                     }
2870                 }
2871             }
2872             FreePostDesc(tmp_pd);
2873         }
2874     } //end while
2875     if(status == GNI_RC_ERROR_RESOURCE)
2876     {
2877         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2878         GNI_RC_CHECK("Smsg_tx_cq full", status);
2879     }
2880 }
2881
2882 static void  SendRdmaMsg( BufferList sendqueue)
2883 {
2884     gni_return_t            status = GNI_RC_SUCCESS;
2885     gni_mem_handle_t        msg_mem_hndl;
2886     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2887     RDMA_REQUEST            *pre = 0;
2888     uint64_t                register_size = 0;
2889     void                    *msg;
2890     int                     i;
2891
2892     int len = PCQueueLength(sendqueue);
2893     for (i=0; i<len; i++)
2894     {
2895 #if CMI_EXERT_RECV_RDMA_CAP
2896         if( RDMA_pending >= RDMA_cap) break;
2897 #endif
2898         CMI_PCQUEUEPOP_LOCK( sendqueue)
2899         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2900         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2901         if (ptr == NULL) break;
2902         
2903         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2904         gni_post_descriptor_t *pd = ptr->pd;
2905         
2906         msg = (void*)(pd->local_addr);
2907         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2908         register_size = 0;
2909         if(pd->cqwrite_value == 0) {
2910             if(NoMsgInRecv(msg))
2911                 register_size = GetMempoolsize(msg);
2912         }
2913
2914         if(status == GNI_RC_SUCCESS)        //mem register good
2915         {
2916             int destNode = ptr->destNode;
2917             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2918             CMI_GNI_LOCK(lock);
2919 #if REMOTE_EVENT
2920             if( pd->cqwrite_value == 0) {
2921                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2922                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2923                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2924             }
2925 #if CMK_PERSISTENT_COMM
2926             else if (pd->cqwrite_value == PERSIST_SEQ) {
2927                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2928                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2929                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2930             }
2931 #endif
2932 #endif
2933 #if CMK_WITH_STATS
2934             RDMA_TRY_SEND(pd->type)
2935 #endif
2936 #if CMK_SMP_TRACE_COMMTHREAD
2937             if(IS_PUT(pd->type))
2938             {
2939                  START_EVENT();
2940                  TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
2941             }
2942 #endif
2943
2944             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2945             {
2946                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2947             }
2948             else
2949             {
2950                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
2951             }
2952             CMI_GNI_UNLOCK(lock);
2953             
2954             if(status == GNI_RC_SUCCESS)    //post good
2955             {
2956 #if CMI_EXERT_RECV_RDMA_CAP
2957                 RDMA_pending ++;
2958 #endif
2959                 if(pd->cqwrite_value == 0)
2960                 {
2961 #if CMK_SMP_TRACE_COMMTHREAD 
2962                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2963 #endif
2964                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2965                 }
2966 #if  CMK_WITH_STATS
2967                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2968                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2969 #endif
2970 #if MACHINE_DEBUG_LOG
2971                 buffered_recv_msg += register_size;
2972                 MACHSTATE(8, "GO request from buffered\n"); 
2973 #endif
2974 #if PRINT_SYH
2975                 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
2976 #endif
2977                 FreeRdmaRequest(ptr);
2978             }else           // cannot post
2979             {
2980                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2981 #if PRINT_SYH
2982                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
2983 #endif
2984                 break;
2985             }
2986         } else          //memory registration fails
2987         {
2988             PCQueuePush(sendqueue, (char*)ptr);
2989         }
2990     } //end while
2991 }
2992
2993 static 
2994 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
2995 {
2996     CONTROL_MSG         *control_msg_tmp;
2997     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
2998
2999     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3000     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3001             /* connection not exists yet */
3002 #if CMK_SMP
3003             /* non-smp case, connect is issued in send_smsg_message */
3004         if (smsg_connected_flag[ptr->destNode] == 0)
3005             connect_to(ptr->destNode); 
3006 #endif
3007     }
3008     else
3009     switch(ptr->tag)
3010     {
3011     case SMALL_DATA_TAG:
3012         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3013         if(status == GNI_RC_SUCCESS)
3014         {
3015             CmiFree(ptr->msg);
3016         }
3017         break;
3018     case LMSG_INIT_TAG:
3019     case LMSG_OOB_INIT_TAG:
3020         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3021         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3022         break;
3023 #if !REMOTE_EVENT && !CQWRITE
3024     case ACK_TAG:
3025         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3026         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3027         break;
3028 #endif
3029     case BIG_MSG_TAG:
3030         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3031         if(status == GNI_RC_SUCCESS)
3032         {
3033             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3034         }
3035         break;
3036 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE 
3037     case PUT_DONE_TAG:
3038         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);  
3039         if(status == GNI_RC_SUCCESS)
3040         {
3041             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3042         }
3043         break;
3044 #endif
3045 #if CMK_DIRECT
3046     case DIRECT_PUT_DONE_TAG:
3047         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);  
3048         if(status == GNI_RC_SUCCESS)
3049         {
3050             free((CMK_DIRECT_HEADER*)ptr->msg);
3051         }
3052         break;
3053 #endif
3054     default:
3055         printf("Weird tag\n");
3056         CmiAbort("should not happen\n");
3057     }       // end switch
3058     return status;
3059 }
3060
3061 // return 1 if all messages are sent
3062
3063 #if ONE_SEND_QUEUE
3064
3065 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3066 {
3067     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3068     CONTROL_MSG         *control_msg_tmp;
3069     gni_return_t        status;
3070     int                 done = 1;
3071     uint64_t            register_size;
3072     void                *register_addr;
3073     int                 index_previous = -1;
3074 #if     CMI_SENDBUFFERSMSG_CAP
3075     int                 sent_length = 0;
3076 #endif
3077     int          index = 0;
3078     memset(destpe_avail, 0, mysize * sizeof(char));
3079     for (index=0; index<1; index++)
3080     {
3081         int i, len = PCQueueLength(queue->sendMsgBuf);
3082         for (i=0; i<len; i++) 
3083         {
3084             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3085             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3086             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3087             if(ptr == NULL) break;
3088             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3089                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3090                 continue;
3091             }
3092             status = _sendOneBufferedSmsg(queue, ptr);
3093 #if CMI_SENDBUFFERSMSG_CAP
3094             sent_length++;
3095 #endif
3096             if(status == GNI_RC_SUCCESS)
3097             {
3098 #if PRINT_SYH
3099                 buffered_smsg_counter--;
3100                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3101 #endif
3102                 FreeMsgList(ptr);
3103             }else {
3104                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3105                 done = 0;
3106                 if(status == GNI_RC_ERROR_RESOURCE)
3107                 {
3108                     destpe_avail[ptr->destNode] = 1;
3109                 }
3110             } 
3111         } //end while
3112     }   // end pooling for all cores
3113     return done;
3114 }
3115
3116 #else   /*  ! ONE_SEND_QUEUE  */
3117
3118 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3119 {
3120     MSG_LIST            *ptr;
3121     gni_return_t        status;
3122     int                 done = 1;
3123 #if     CMI_SENDBUFFERSMSG_CAP
3124     int                 sent_length = 0;
3125 #endif
3126     int idx;
3127 #if SMP_LOCKS
3128     int          index = -1;
3129     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3130     for(idx =0; idx<nonempty; idx++) 
3131     {
3132         index++;  if (index >= nonempty) index = 0;
3133 #if CMI_SENDBUFFERSMSG_CAP
3134         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3135 #endif
3136         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3137         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3138         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3139         if(current_list == NULL) break; 
3140         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3141             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3142             continue;
3143         }
3144         PCQueue current_queue= current_list->sendSmsgBuf;
3145         CmiLock(current_list->lock);
3146         int i, len = PCQueueLength(current_queue);
3147         current_list->pushed = 0;
3148         CmiUnlock(current_list->lock);
3149 #else      /* ! SMP_LOCKS */
3150     static int          index = -1;
3151     for(idx =0; idx<mysize; idx++) 
3152     {
3153         index++;  if (index == mysize) index = 0;
3154 #if CMI_SENDBUFFERSMSG_CAP
3155         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3156 #endif
3157         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3158         //if (index == myrank) continue;
3159         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3160         int i, len = PCQueueLength(current_queue);
3161 #endif
3162         for (i=0; i<len; i++)  {
3163             CMI_PCQUEUEPOP_LOCK(current_queue)
3164             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3165             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3166             if (ptr == 0) break;
3167
3168             status = _sendOneBufferedSmsg(queue, ptr);
3169 #if CMI_SENDBUFFERSMSG_CAP
3170             sent_length++;
3171 #endif
3172             if(status == GNI_RC_SUCCESS)
3173             {
3174 #if PRINT_SYH
3175                 buffered_smsg_counter--;
3176                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3177 #endif
3178                 FreeMsgList(ptr);
3179             }else {
3180                 PCQueuePush(current_queue, (char*)ptr);
3181                 done = 0;
3182                 if(status == GNI_RC_ERROR_RESOURCE)
3183                 {
3184                     break;
3185                 }
3186             } 
3187         } //end for i
3188 #if SMP_LOCKS
3189         CmiLock(current_list->lock);
3190         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3191         {
3192             current_list->pushed = 1;
3193             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3194         }
3195         CmiUnlock(current_list->lock); 
3196 #endif
3197     }   // end pooling for all cores
3198     return done;
3199 }
3200
3201 #endif
3202
3203 static void ProcessDeadlock();
3204 void LrtsAdvanceCommunication(int whileidle)
3205 {
3206     static int count = 0;
3207     /*  Receive Msg first */
3208 #if CMK_SMP_TRACE_COMMTHREAD
3209     double startT, endT;
3210 #endif
3211     if (useDynamicSMSG && whileidle)
3212     {
3213 #if CMK_SMP_TRACE_COMMTHREAD
3214         startT = CmiWallTimer();
3215 #endif
3216         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3217 #if CMK_SMP_TRACE_COMMTHREAD
3218         endT = CmiWallTimer();
3219         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3220 #endif
3221     }
3222
3223     SEND_OOB_SMSG(smsg_oob_queue)
3224     PUMP_REMOTE_HIGHPRIORITY
3225     PUMP_LOCAL_HIGHPRIORITY
3226     POST_HIGHPRIORITY_RDMA
3227     // Receiving small messages and persistent
3228 #if CMK_SMP_TRACE_COMMTHREAD
3229     startT = CmiWallTimer();
3230 #endif
3231     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3232 #if CMK_SMP_TRACE_COMMTHREAD
3233     endT = CmiWallTimer();
3234     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3235 #endif
3236
3237     SEND_OOB_SMSG(smsg_oob_queue)
3238     PUMP_REMOTE_HIGHPRIORITY
3239     PUMP_LOCAL_HIGHPRIORITY
3240     POST_HIGHPRIORITY_RDMA
3241     
3242     ///* Send buffered Message */
3243 #if CMK_SMP_TRACE_COMMTHREAD
3244     startT = CmiWallTimer();
3245 #endif
3246 #if CMK_USE_OOB
3247     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3248 #else
3249     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3250 #endif
3251 #if CMK_SMP_TRACE_COMMTHREAD
3252     endT = CmiWallTimer();
3253     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3254 #endif
3255
3256     SEND_OOB_SMSG(smsg_oob_queue)
3257     PUMP_REMOTE_HIGHPRIORITY
3258     PUMP_LOCAL_HIGHPRIORITY
3259     POST_HIGHPRIORITY_RDMA
3260
3261     //Pump Get messages or PUT messages
3262 #if CMK_SMP_TRACE_COMMTHREAD
3263     startT = CmiWallTimer();
3264 #endif
3265     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3266 #if MULTI_THREAD_SEND
3267     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3268 #endif
3269 #if CMK_SMP_TRACE_COMMTHREAD
3270     endT = CmiWallTimer();
3271     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3272 #endif
3273     
3274     SEND_OOB_SMSG(smsg_oob_queue)
3275     PUMP_REMOTE_HIGHPRIORITY
3276     PUMP_LOCAL_HIGHPRIORITY
3277     POST_HIGHPRIORITY_RDMA
3278     //Pump Remote event
3279 #if CMK_SMP_TRACE_COMMTHREAD
3280     startT = CmiWallTimer();
3281 #endif
3282 #if CQWRITE
3283     PumpCqWriteTransactions();
3284 #endif
3285 #if REMOTE_EVENT
3286     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3287 #endif
3288 #if CMK_SMP_TRACE_COMMTHREAD
3289     endT = CmiWallTimer();
3290     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3291 #endif
3292
3293     SEND_OOB_SMSG(smsg_oob_queue)
3294     PUMP_REMOTE_HIGHPRIORITY
3295     PUMP_LOCAL_HIGHPRIORITY
3296     POST_HIGHPRIORITY_RDMA
3297
3298 #if CMK_SMP_TRACE_COMMTHREAD
3299     startT = CmiWallTimer();
3300 #endif
3301     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3302 #if CMK_SMP_TRACE_COMMTHREAD
3303     endT = CmiWallTimer();
3304     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3305 #endif
3306
3307 #if CMK_SMP && ! LARGEPAGE
3308     if (_detected_hang)  ProcessDeadlock();
3309 #endif
3310 }
3311
3312 static void set_smsg_max()
3313 {
3314     char *env;
3315
3316     if(mysize <=512)
3317     {
3318         SMSG_MAX_MSG = 1024;
3319     }else if (mysize <= 4096)
3320     {
3321         SMSG_MAX_MSG = 1024;
3322     }else if (mysize <= 16384)
3323     {
3324         SMSG_MAX_MSG = 512;
3325     }else {
3326         SMSG_MAX_MSG = 256;
3327     }
3328
3329     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3330     if (env) SMSG_MAX_MSG = atoi(env);
3331     CmiAssert(SMSG_MAX_MSG > 0);
3332 }    
3333
3334 /* useDynamicSMSG */
3335 static void _init_dynamic_smsg()
3336 {
3337     gni_return_t status;
3338     uint32_t     vmdh_index = -1;
3339     int i;
3340
3341     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3342     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3343     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3344     for(i=0; i<mysize; i++) {
3345         smsg_connected_flag[i] = 0;
3346         smsg_attr_vector_local[i] = NULL;
3347         smsg_attr_vector_remote[i] = NULL;
3348     }
3349
3350     set_smsg_max();
3351
3352     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3353     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3354     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3355     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3356     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3357
3358     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3359     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3360     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3361     bzero(mailbox_list->mailbox_base, mailbox_list->size);
3362     mailbox_list->offset = 0;
3363     mailbox_list->next = 0;
3364     
3365     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3366         mailbox_list->size, smsg_rx_cqh,
3367         GNI_MEM_READWRITE,   
3368         vmdh_index,
3369         &(mailbox_list->mem_hndl));
3370     GNI_RC_CHECK("MEMORY registration for smsg", status);
3371
3372     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3373     GNI_RC_CHECK("Unbound EP", status);
3374     
3375     alloc_smsg_attr(&send_smsg_attr);
3376
3377     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3378     GNI_RC_CHECK("post unbound datagram", status);
3379
3380       /* always pre-connect to proc 0 */
3381     //if (myrank != 0) connect_to(0);
3382
3383     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3384     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3385 }
3386
3387 static void _init_static_smsg()
3388 {
3389     gni_smsg_attr_t      *smsg_attr;
3390     gni_smsg_attr_t      remote_smsg_attr;
3391     gni_smsg_attr_t      *smsg_attr_vec;
3392     gni_mem_handle_t     my_smsg_mdh_mailbox;
3393     int      ret, i;
3394     gni_return_t status;
3395     uint32_t              vmdh_index = -1;
3396     mdh_addr_t            base_infor;
3397     mdh_addr_t            *base_addr_vec;
3398
3399     set_smsg_max();
3400     
3401     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3402     
3403     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3404     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3405     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3406     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3407     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3408     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3409     CmiAssert(ret == 0);
3410     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3411     
3412     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3413             smsg_memlen*(mysize), smsg_rx_cqh,
3414             GNI_MEM_READWRITE,   
3415             vmdh_index,
3416             &my_smsg_mdh_mailbox);
3417     register_memory_size += smsg_memlen*(mysize);
3418     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3419
3420     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3421     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3422
3423     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3424     base_infor.mdh =  my_smsg_mdh_mailbox;
3425     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3426
3427     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3428  
3429     for(i=0; i<mysize; i++)
3430     {
3431         if(i==myrank)
3432             continue;
3433         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3434         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3435         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3436         smsg_attr[i].mbox_offset = i*smsg_memlen;
3437         smsg_attr[i].buff_size = smsg_memlen;
3438         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3439         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3440     }
3441
3442     for(i=0; i<mysize; i++)
3443     {
3444         if (myrank == i) continue;
3445
3446         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3447         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3448         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3449         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3450         remote_smsg_attr.buff_size = smsg_memlen;
3451         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3452         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3453
3454         /* initialize the smsg channel */
3455         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3456         GNI_RC_CHECK("SMSG Init", status);
3457     } //end initialization
3458
3459     free(base_addr_vec);
3460     free(smsg_attr);
3461
3462     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3463     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3464
3465
3466 inline
3467 static void _init_send_queue(SMSG_QUEUE *queue)
3468 {
3469      int i;
3470 #if ONE_SEND_QUEUE
3471      queue->sendMsgBuf = PCQueueCreate();
3472      destpe_avail = (char*)malloc(mysize * sizeof(char));
3473 #else
3474      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3475 #if SMP_LOCKS
3476      queue->nonEmptyQueues = PCQueueCreate();
3477 #endif
3478      for(i =0; i<mysize; i++)
3479      {
3480          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3481 #if SMP_LOCKS
3482          queue->smsg_msglist_index[i].pushed = 0;
3483          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3484          queue->smsg_msglist_index[i].destpe = i;
3485 #endif
3486      }
3487 #endif
3488 }
3489
3490 inline
3491 static void _init_smsg()
3492 {
3493     if(mysize > 1) {
3494         if (useDynamicSMSG)
3495             _init_dynamic_smsg();
3496         else
3497             _init_static_smsg();
3498     }
3499
3500     _init_send_queue(&smsg_queue);
3501 #if CMK_USE_OOB
3502     _init_send_queue(&smsg_oob_queue);
3503 #endif
3504 }
3505
3506 static void _init_static_msgq()
3507 {
3508     gni_return_t status;
3509     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
3510     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3511     msgq_attrs.smsg_q_sz = 1;
3512     msgq_attrs.rcv_pool_sz = 1;
3513     msgq_attrs.num_msgq_eps = 2;
3514     msgq_attrs.nloc_insts = 8;
3515     msgq_attrs.modes = 0;
3516     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3517
3518     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3519     GNI_RC_CHECK("MSGQ Init", status);
3520
3521
3522 }
3523
3524
3525 static CmiUInt8 total_mempool_size = 0;
3526 static CmiUInt8 total_mempool_calls = 0;
3527
3528 #if USE_LRTS_MEMPOOL
3529
3530 inline
3531 static void *_alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag, gni_cq_handle_t cqh)
3532 {
3533     void *pool;
3534     int ret;
3535     gni_return_t status = GNI_RC_SUCCESS;
3536
3537     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
3538     if (*size < default_size) *size = default_size;
3539 #if LARGEPAGE
3540     // round up to be multiple of _tlbpagesize
3541     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3542     *size = ALIGNHUGEPAGE(*size);
3543 #endif
3544     total_mempool_size += *size;
3545     total_mempool_calls += 1;
3546 #if   !LARGEPAGE
3547     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
3548     {
3549         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3550         CmiAbort("alloc_mempool_block");
3551     }
3552 #endif
3553 #if LARGEPAGE
3554     pool = my_get_huge_pages(*size);
3555     ret = pool==NULL;
3556 #else
3557     ret = posix_memalign(&pool, ALIGNBUF, *size);
3558 #endif
3559     if (ret != 0) {
3560       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3561       if (ret == ENOMEM)
3562         CmiAbort("alloc_mempool_block: out of memory.");
3563       else
3564         CmiAbort("alloc_mempool_block: posix_memalign failed");
3565     }
3566 #if LARGEPAGE
3567     CmiMemLock();
3568     register_count++;
3569     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, cqh, status);
3570     CmiMemUnlock();
3571     if(status != GNI_RC_SUCCESS) {
3572         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3573 sweep_mempool(CpvAccess(mempool));
3574     }
3575     GNI_RC_CHECK("MEMORY_REGISTER", status);
3576 #else
3577     SetMemHndlZero((*mem_hndl));
3578 #endif
3579     return pool;
3580 }
3581
3582 inline
3583 static void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3584 {
3585     return _alloc_mempool_block(size, mem_hndl, expand_flag, rdma_rx_cqh);
3586 }
3587
3588 #if CMK_PERSISTENT_COMM
3589 inline
3590 static void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3591 {
3592     return _alloc_mempool_block(size, mem_hndl, expand_flag, highpriority_rx_cqh);
3593 }
3594 #endif
3595
3596 // ptr is a block head pointer
3597 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3598 {
3599     if(!(IsMemHndlZero(mem_hndl)))
3600     {
3601         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3602     }
3603 #if LARGEPAGE
3604     my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3605 #else
3606     free(ptr);
3607 #endif
3608 }
3609 #endif
3610
3611 void LrtsPreCommonInit(int everReturn){
3612 #if USE_LRTS_MEMPOOL
3613     CpvInitialize(mempool_type*, mempool);
3614     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3615 #if CMK_PERSISTENT_COMM
3616     CpvInitialize(mempool_type*, persistent_mempool);
3617     CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_persistent_mempool_block, free_mempool_block, _mempool_size_limit);
3618 #endif
3619     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
3620 #endif
3621 }
3622
3623 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3624 {
3625     register int            i;
3626     int                     rc;
3627     int                     device_id = 0;
3628     unsigned int            remote_addr;
3629     gni_cdm_handle_t        cdm_hndl;
3630     gni_return_t            status = GNI_RC_SUCCESS;
3631     uint32_t                vmdh_index = -1;
3632     uint8_t                 ptag;
3633     unsigned int            local_addr, *MPID_UGNI_AllAddr;
3634     int                     first_spawned;
3635     int                     physicalID;
3636     char                   *env;
3637
3638     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
3639     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3640     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
3641    
3642     status = PMI_Init(&first_spawned);
3643     GNI_RC_CHECK("PMI_Init", status);
3644
3645     status = PMI_Get_size(&mysize);
3646     GNI_RC_CHECK("PMI_Getsize", status);
3647
3648     status = PMI_Get_rank(&myrank);
3649     GNI_RC_CHECK("PMI_getrank", status);
3650
3651     //physicalID = CmiPhysicalNodeID(myrank);
3652     
3653     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3654
3655     *myNodeID = myrank;
3656     *numNodes = mysize;
3657   
3658 #if MULTI_THREAD_SEND
3659     /* Currently, we only consider the case that comm. thread will only recv msgs */
3660     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3661 #endif
3662
3663 #if CMI_EXERT_SEND_LARGE_CAP
3664     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3665 #endif
3666
3667 #if CMI_SENDBUFFERSMSG_CAP
3668     CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
3669 #endif
3670
3671 #if CMI_PUMPNETWORKSMSG_CAP 
3672     CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
3673 #endif
3674
3675     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3676     
3677     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3678     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3679     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3680
3681     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3682     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3683     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3684
3685     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3686     if (env) useDynamicSMSG = 1;
3687     if (!useDynamicSMSG)
3688       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3689     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3690     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3691     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3692     
3693     if(myrank == 0)
3694     {
3695         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3696         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3697     }
3698 #ifdef USE_ONESIDED
3699     onesided_init(NULL, &onesided_hnd);
3700
3701     // this is a GNI test, so use the libonesided bypass functionality
3702     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3703     local_addr = gniGetNicAddress();
3704 #else
3705     ptag = get_ptag();
3706     cookie = get_cookie();
3707 #if 0
3708     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3709 #endif
3710     //Create and attach to the communication  domain */
3711     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3712     GNI_RC_CHECK("GNI_CdmCreate", status);
3713     //* device id The device id is the minor number for the device
3714     //that is assigned to the device by the system when the device is created.
3715     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3716     //where X is the device number 0 default 
3717     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3718     GNI_RC_CHECK("GNI_CdmAttach", status);
3719     local_addr = get_gni_nic_address(0);
3720 #endif
3721     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3722     _MEMCHECK(MPID_UGNI_AllAddr);
3723     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3724     /* create the local completion queue */
3725     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3726     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3727     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3728 #if MULTI_THREAD_SEND
3729     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3730     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3731 #endif
3732
3733 #if CMK_USE_OOB
3734     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highprior_rdma_tx_cqh);
3735     GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status);
3736 #endif
3737     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3738
3739     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3740     GNI_RC_CHECK("Create CQ (rx)", status);
3741     
3742     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3743     GNI_RC_CHECK("Create Post CQ (rx)", status);
3744    
3745 #if CMK_PERSISTENT_COMM
3746     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highpriority_rx_cqh);
3747     GNI_RC_CHECK("Create Post CQ (rx)", status);
3748 #endif
3749     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3750     //GNI_RC_CHECK("Create BTE CQ", status);
3751
3752     /* create the endpoints. they need to be bound to allow later CQWrites to them */
3753     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3754     _MEMCHECK(ep_hndl_array);
3755 #if MULTI_THREAD_SEND 
3756     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3757     //default_tx_cq_lock = CmiCreateLock();
3758     rdma_tx_cq_lock = CmiCreateLock();
3759     smsg_rx_cq_lock = CmiCreateLock();
3760     //global_gni_lock  = CmiCreateLock();
3761     //rx_cq_lock  = CmiCreateLock();
3762 #endif
3763     for (i=0; i<mysize; i++) {
3764         if(i == myrank) continue;
3765         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3766         GNI_RC_CHECK("GNI_EpCreate ", status);   
3767         remote_addr = MPID_UGNI_AllAddr[i];
3768         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3769         GNI_RC_CHECK("GNI_EpBind ", status);   
3770     }
3771
3772     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3773     _init_smsg();
3774     PMI_Barrier();
3775
3776 #if     USE_LRTS_MEMPOOL
3777     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3778     if (env) {
3779         _totalmem = CmiReadSize(env);
3780         if (myrank == 0)
3781             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3782     }
3783
3784     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3785     if (env) _mempool_size = CmiReadSize(env);
3786     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
3787         _mempool_size = CmiReadSize(env);
3788
3789
3790     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3791     if (env) {
3792         MAX_REG_MEM = CmiReadSize(env);
3793         user_set_flag = 1;
3794     }
3795     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
3796         MAX_REG_MEM = CmiReadSize(env);
3797         user_set_flag = 1;
3798     }
3799
3800     env = getenv("CHARM_UGNI_SEND_MAX");
3801     if (env) {
3802         MAX_BUFF_SEND = CmiReadSize(env);
3803         user_set_flag = 1;
3804     }
3805     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
3806         MAX_BUFF_SEND = CmiReadSize(env);
3807         user_set_flag = 1;
3808     }
3809
3810     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3811     if (env) {
3812         _mempool_size_limit = CmiReadSize(env);
3813     }
3814
3815     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3816     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
3817
3818     if (myrank==0) {
3819         printf("Charm++> memory pool init block size: %1.fMB, total memory pool limit %1.fMB (0 means no limite)\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3820         printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3821         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
3822             /* memblock can expand to BIG_MSG * 2 size */
3823             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
3824             CmiAbort("mempool maximum size is too small. \n");
3825         }
3826 #if MULTI_THREAD_SEND
3827         printf("Charm++> worker thread sending messages\n");
3828 #elif COMM_THREAD_SEND
3829         printf("Charm++> only comm thread send/recv messages\n");
3830 #endif
3831     }
3832
3833 #endif     /* end of USE_LRTS_MEMPOOL */
3834
3835     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3836     if (env) {
3837         BIG_MSG = CmiReadSize(env);
3838         if (BIG_MSG < ONE_SEG)
3839           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3840     }
3841     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3842     if (env) {
3843         BIG_MSG_PIPELINE = atoi(env);
3844     }
3845
3846     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3847     if (env) _checkProgress = 0;
3848     if (mysize == 1) _checkProgress = 0;
3849
3850 #if CMI_EXERT_RECV_RDMA_CAP
3851     env = getenv("CHARM_UGNI_RDMA_MAX");
3852     if (env)  {
3853         RDMA_pending = atoi(env);
3854         if (myrank == 0)
3855             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3856     }
3857 #endif
3858     
3859     /*
3860     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3861     if (env) 
3862         _tlbpagesize = CmiReadSize(env);
3863     */
3864     /* real gethugepagesize() is only available when hugetlb module linked */
3865     _tlbpagesize = gethugepagesize();
3866     if (myrank == 0) {
3867         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3868     }
3869
3870 #if LARGEPAGE
3871     if (_tlbpagesize == 4096) {
3872         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3873     }
3874 #endif
3875
3876       /* stats related arguments */
3877 #if CMK_WITH_STATS
3878     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
3879
3880     print_stats = CmiGetArgFlag(*argv, "+print_stats");
3881     
3882     stats_off = CmiGetArgFlag(*argv, "+stats_off");
3883
3884     init_comm_stats();
3885 #endif
3886
3887     /* init DMA buffer for medium message */
3888
3889     //_init_DMA_buffer();
3890     
3891     free(MPID_UGNI_AllAddr);
3892
3893     sendRdmaBuf = PCQueueCreate();
3894     sendHighPriorBuf = PCQueueCreate();
3895
3896 #if MACHINE_DEBUG_LOG
3897     char ln[200];
3898     sprintf(ln,"debugLog.%d",myrank);
3899     debugLog=fopen(ln,"w");
3900 #endif
3901
3902 //    NTK_Init();
3903 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3904
3905 #if  REMOTE_EVENT
3906     SHIFT = 1;
3907     while (1<<SHIFT < mysize) SHIFT++;
3908     CmiAssert(SHIFT < 31);
3909     IndexPool_init(&ackPool);
3910 #if CMK_PERSISTENT_COMM
3911     IndexPool_init(&persistPool);
3912 #endif
3913 #endif
3914 }
3915
3916 void* LrtsAlloc(int n_bytes, int header)
3917 {
3918     void *ptr = NULL;
3919 #if 0
3920     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3921 #endif
3922     if(n_bytes <= SMSG_MAX_MSG)
3923     {
3924         int totalsize = n_bytes+header;
3925         ptr = malloc(totalsize);
3926     }
3927     else {
3928         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3929 #if     USE_LRTS_MEMPOOL
3930         n_bytes = ALIGN64(n_bytes);
3931         if(n_bytes < BIG_MSG)
3932         {
3933             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3934             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3935         }else 
3936         {
3937 #if LARGEPAGE
3938             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3939             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3940             char *res = my_get_huge_pages(n_bytes);
3941 #else
3942             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3943 #endif
3944             if (res) ptr = res + ALIGNBUF - header;
3945         }
3946 #else
3947         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3948         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3949         ptr = res + ALIGNBUF - header;
3950 #endif
3951     }
3952     return ptr;
3953 }
3954
3955 void  LrtsFree(void *msg)
3956 {
3957     CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3958 #if CMK_PERSISTENT_COMM
3959     if (IS_PERSISTENT_MEMORY(msg)) return;
3960 #endif
3961     if (size <= SMSG_MAX_MSG)
3962         free(msg);
3963     else {
3964         size = ALIGN64(size);
3965         if(size>=BIG_MSG)
3966         {
3967 #if LARGEPAGE
3968             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3969             my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3970 #else
3971             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3972 #endif
3973         }
3974         else {
3975 #if    USE_LRTS_MEMPOOL
3976 #if CMK_SMP
3977             mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3978 #else
3979             mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3980 #endif
3981 #else
3982             free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3983 #endif
3984         }
3985     }
3986 }
3987
3988 void LrtsExit()
3989 {
3990 #if CMK_WITH_STATS
3991 #if CMK_SMP
3992     if(CmiMyRank() == CmiMyNodeSize())
3993 #endif
3994     if (print_stats) print_comm_stats();
3995 #endif
3996     /* free memory ? */
3997 #if USE_LRTS_MEMPOOL
3998     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
3999     mempool_destroy(CpvAccess(mempool));
4000 #endif
4001     PMI_Finalize();
4002     exit(0);
4003 }
4004
4005 void LrtsDrainResources()
4006 {
4007     if(mysize == 1) return;
4008     while (
4009 #if CMK_USE_OOB
4010            !SendBufferMsg(&smsg_oob_queue, NULL) ||
4011 #endif
4012            !SendBufferMsg(&smsg_queue, NULL) 
4013           )
4014     {
4015         if (useDynamicSMSG)
4016             PumpDatagramConnection();
4017         PumpNetworkSmsg();
4018         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4019
4020 #if MULTI_THREAD_SEND
4021         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4022 #endif
4023
4024 #if CMK_USE_OOB 
4025         PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock);
4026 #endif
4027
4028 #if REMOTE_EVENT
4029         PumpRemoteTransactions(rdma_rx_cqh);
4030 #endif
4031         SendRdmaMsg(sendRdmaBuf);
4032         SendRdmaMsg(sendHighPriorBuf);
4033     }
4034     PMI_Barrier();
4035 }
4036
4037 void LrtsAbort(const char *message) {
4038     fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
4039     CmiPrintStackTrace(0);
4040     PMI_Abort(-1, message);
4041 }
4042
4043 /**************************  TIMER FUNCTIONS **************************/
4044 #if CMK_TIMER_USE_SPECIAL
4045 /* MPI calls are not threadsafe, even the timer on some machines */
4046 static CmiNodeLock  timerLock = 0;
4047 static int _absoluteTime = 0;
4048 static int _is_global = 0;
4049 static struct timespec start_ts;
4050
4051 inline int CmiTimerIsSynchronized() {
4052     return 0;
4053 }
4054
4055 inline int CmiTimerAbsolute() {
4056     return _absoluteTime;
4057 }
4058
4059 double CmiStartTimer() {
4060     return 0.0;
4061 }
4062
4063 double CmiInitTime() {
4064     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4065 }
4066
4067 void CmiTimerInit(char **argv) {
4068     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4069     if (_absoluteTime && CmiMyPe() == 0)
4070         printf("Charm++> absolute  timer is used\n");
4071     
4072     _is_global = CmiTimerIsSynchronized();
4073
4074
4075     if (_is_global) {
4076         if (CmiMyRank() == 0) {
4077             clock_gettime(CLOCK_MONOTONIC, &start_ts);
4078         }
4079     } else { /* we don't have a synchronous timer, set our own start time */
4080         CmiBarrier();
4081         CmiBarrier();
4082         CmiBarrier();
4083         clock_gettime(CLOCK_MONOTONIC, &start_ts);