Fixed bugs in comm thread tracing
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40
41 #include "converse.h"
42
43 #if CMK_DIRECT
44 #include "cmidirect.h"
45 #endif
46 #define PRINT_SYH  0
47
48 #define USE_LRTS_MEMPOOL                  1
49 #define REMOTE_EVENT                      0
50
51 #define CMI_EXERT_SEND_CAP      0
52 #define CMI_EXERT_RECV_CAP      0
53
54 #if CMI_EXERT_SEND_CAP
55 #define SEND_CAP 16
56 #endif
57
58 #if CMI_EXERT_RECV_CAP
59 #define RECV_CAP 2
60 #endif
61
62 #if CMK_SMP
63 #define COMM_THREAD_SEND 1
64 //#define MULTI_THREAD_SEND 1
65 #endif
66
67 // Trace communication thread
68 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
69 #define TRACE_THRESHOLD     0.00005
70 #define CMI_MPI_TRACE_MOREDETAILED 0
71 #undef CMI_MPI_TRACE_USEREVENTS
72 #define CMI_MPI_TRACE_USEREVENTS 1
73 #else
74 #undef CMK_SMP_TRACE_COMMTHREAD
75 #define CMK_SMP_TRACE_COMMTHREAD 0
76 #endif
77
78 #define CMK_TRACE_COMMOVERHEAD 0
79 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
80 #undef CMI_MPI_TRACE_USEREVENTS
81 #define CMI_MPI_TRACE_USEREVENTS 1
82 #else
83 #undef CMK_TRACE_COMMOVERHEAD
84 #define CMK_TRACE_COMMOVERHEAD 0
85 #endif
86
87 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
88 CpvStaticDeclare(double, projTraceStart);
89 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
90 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
91 #else
92 #define  START_EVENT()
93 #define  END_EVENT(x)
94 #endif
95
96 #if USE_LRTS_MEMPOOL
97
98 #define oneMB (1024ll*1024)
99 #define oneGB (1024ll*1024*1024)
100
101 static CmiInt8 _mempool_size = 8*oneMB;
102 static CmiInt8 _expand_mem =  4*oneMB;
103 static CmiInt8 _mempool_size_limit = 0;
104
105 static CmiInt8 _totalmem = 0.8*oneGB;
106
107 static int BIG_MSG  =  4*oneMB;
108 static int ONE_SEG  =  2*oneMB;
109 static int BIG_MSG_PIPELINE = 4;
110
111 // dynamic flow control
112 static CmiInt8 buffered_send_msg = 0;
113 static int   register_memory_size = 0;
114
115 #if CMK_SMP && COMM_THREAD_SEND 
116 //Dynamic flow control about memory registration
117 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
118 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
119 #else
120 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
121 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
122 #endif
123
124 #endif     /* end USE_LRTS_MEMPOOL */
125
126 #if CMK_SMP && MULTI_THREAD_SEND
127 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
128 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
129 #else
130 #define     CMI_GNI_LOCK
131 #define     CMI_GNI_UNLOCK
132 #endif
133
134 static int _tlbpagesize = 4096;
135
136 static int   user_set_flag  = 0;
137
138 static int _checkProgress = 1;             /* check deadlock */
139 static int _detected_hang = 0;
140
141 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
142
143 // dynamic SMSG
144 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
145
146 static int avg_smsg_connection = 32;
147 static int                 *smsg_connected_flag= 0;
148 static gni_smsg_attr_t     **smsg_attr_vector_local;
149 static gni_smsg_attr_t     **smsg_attr_vector_remote;
150 static gni_ep_handle_t     ep_hndl_unbound;
151 static gni_smsg_attr_t     send_smsg_attr;
152 static gni_smsg_attr_t     recv_smsg_attr;
153
154 typedef struct _dynamic_smsg_mailbox{
155    void     *mailbox_base;
156    int      size;
157    int      offset;
158    gni_mem_handle_t  mem_hndl;
159    struct      _dynamic_smsg_mailbox  *next;
160 }dynamic_smsg_mailbox_t;
161
162 static dynamic_smsg_mailbox_t  *mailbox_list;
163
164 int         rdma_id = 0;
165
166 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
167 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
168
169 #if PRINT_SYH
170 int         lrts_send_msg_id = 0;
171 int         lrts_local_done_msg = 0;
172 int         lrts_send_rdma_success = 0;
173 #endif
174
175 #include "machine.h"
176
177 #include "pcqueue.h"
178
179 #include "mempool.h"
180
181 #if CMK_PERSISTENT_COMM
182 #include "machine-persistent.h"
183 #endif
184
185 //#define  USE_ONESIDED 1
186 #ifdef USE_ONESIDED
187 //onesided implementation is wrong, since no place to restore omdh
188 #include "onesided.h"
189 onesided_hnd_t   onesided_hnd;
190 onesided_md_t    omdh;
191 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
192
193 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
194
195 #else
196 uint8_t   onesided_hnd, omdh;
197 #if REMOTE_EVENT
198 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
199          status = GNI_RC_ERROR_NOMEM;} \
200         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
201                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
202 #else
203 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
204     do {   \
205         if (register_memory_size + size >= MAX_REG_MEM) { \
206             status = GNI_RC_ERROR_NOMEM; \
207         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
208             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
209     } while(0)
210 #endif
211 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
212     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
213              register_memory_size -= size; \
214          else CmiAbort("MEM_DEregister");  \
215     } while (0)
216 #endif
217
218 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
219 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
220 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
221 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
222 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
223 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
224 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
225 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
226 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
227 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
228 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
229 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
230 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
231 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
232
233 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
234 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
235
236 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
237 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
238 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
239 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
240
241 #define ALIGNBUF                64
242
243 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
244 /* If SMSG is not used */
245
246 #define FMA_PER_CORE  1024
247 #define FMA_BUFFER_SIZE 1024
248
249 /* If SMSG is used */
250 static int  SMSG_MAX_MSG = 1024;
251 #define SMSG_MAX_CREDIT 72 
252
253 #define MSGQ_MAXSIZE       2048
254 /* large message transfer with FMA or BTE */
255 #define LRTS_GNI_RDMA_THRESHOLD  1024 
256
257 #if CMK_SMP
258 static int  REMOTE_QUEUE_ENTRIES=163840; 
259 static int LOCAL_QUEUE_ENTRIES=163840; 
260 #else
261 static int  REMOTE_QUEUE_ENTRIES=20480;
262 static int LOCAL_QUEUE_ENTRIES=20480; 
263 #endif
264
265 #define BIG_MSG_TAG  0x26
266 #define PUT_DONE_TAG      0x28
267 #define DIRECT_PUT_DONE_TAG      0x29
268 #define ACK_TAG           0x30
269 /* SMSG is data message */
270 #define SMALL_DATA_TAG          0x31
271 /* SMSG is a control message to initialize a BTE */
272 #define MEDIUM_HEAD_TAG         0x32
273 #define MEDIUM_DATA_TAG         0x33
274 #define LMSG_INIT_TAG           0x39 
275 #define VERY_LMSG_INIT_TAG      0x40 
276 #define VERY_LMSG_TAG           0x41 
277
278 #define DEBUG
279 #ifdef GNI_RC_CHECK
280 #undef GNI_RC_CHECK
281 #endif
282 #ifdef DEBUG
283 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
284 #else
285 #define GNI_RC_CHECK(msg,rc)
286 #endif
287
288 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
289 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
290
291 static int useStaticMSGQ = 0;
292 static int useStaticFMA = 0;
293 static int mysize, myrank;
294 gni_nic_handle_t      nic_hndl;
295
296 typedef struct {
297     gni_mem_handle_t mdh;
298     uint64_t addr;
299 } mdh_addr_t ;
300 // this is related to dynamic SMSG
301
302 typedef struct mdh_addr_list{
303     gni_mem_handle_t mdh;
304    void *addr;
305     struct mdh_addr_list *next;
306 }mdh_addr_list_t;
307
308 static unsigned int         smsg_memlen;
309 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
310 mdh_addr_t          setup_mem;
311 mdh_addr_t          *smsg_connection_vec = 0;
312 gni_mem_handle_t    smsg_connection_memhndl;
313 static int          smsg_expand_slots = 10;
314 static int          smsg_available_slot = 0;
315 static void         *smsg_mailbox_mempool = 0;
316 mdh_addr_list_t     *smsg_dynamic_list = 0;
317
318 static void             *smsg_mailbox_base;
319 gni_msgq_attr_t         msgq_attrs;
320 gni_msgq_handle_t       msgq_handle;
321 gni_msgq_ep_attr_t      msgq_ep_attrs;
322 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
323
324 /* =====Beginning of Declarations of Machine Specific Variables===== */
325 static int cookie;
326 static int modes = 0;
327 static gni_cq_handle_t       smsg_rx_cqh = NULL;
328 static gni_cq_handle_t       smsg_tx_cqh = NULL;
329 static gni_cq_handle_t       post_rx_cqh = NULL;
330 static gni_cq_handle_t       post_tx_cqh = NULL;
331 static gni_ep_handle_t       *ep_hndl_array;
332 #if CMK_SMP && MULTI_THREAD_SEND
333 static CmiNodeLock           *ep_lock_array;
334 static CmiNodeLock           tx_cq_lock; 
335 static CmiNodeLock           rx_cq_lock;
336 static CmiNodeLock           *mempool_lock;
337 #endif
338
339 typedef struct msg_list
340 {
341     uint32_t destNode;
342     uint32_t size;
343     void *msg;
344     uint8_t tag;
345 #if !CMK_SMP
346     struct msg_list *next;
347 #endif
348 }MSG_LIST;
349
350
351 typedef struct control_msg
352 {
353     uint64_t            source_addr;    /* address from the start of buffer  */
354     uint64_t            dest_addr;      /* address from the start of buffer */
355     int                 total_length;   /* total length */
356     int                 length;         /* length of this packet */
357     uint8_t             seq_id;         //big message   0 meaning single message
358     gni_mem_handle_t    source_mem_hndl;
359     struct control_msg *next;
360 } CONTROL_MSG;
361
362 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
363
364 typedef struct ack_msg
365 {
366     uint64_t            source_addr;    /* address from the start of buffer  */
367 #if ! USE_LRTS_MEMPOOL
368     gni_mem_handle_t    source_mem_hndl;
369     int                 length;          /* total length */
370 #endif
371     struct ack_msg     *next;
372 } ACK_MSG;
373
374 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
375
376 #if CMK_DIRECT
377 typedef struct{
378     uint64_t    handler_addr;
379 }CMK_DIRECT_HEADER;
380
381 typedef struct {
382     char core[CmiMsgHeaderSizeBytes];
383     uint64_t handler;
384 }cmidirectMsg;
385
386 //SYH
387 CpvDeclare(int, CmiHandleDirectIdx);
388 void CmiHandleDirectMsg(cmidirectMsg* msg)
389 {
390
391     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
392    (*(_handle->callbackFnPtr))(_handle->callbackData);
393    CmiFree(msg);
394 }
395
396 void CmiDirectInit()
397 {
398     CpvInitialize(int,  CmiHandleDirectIdx);
399     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
400 }
401
402 #endif
403 typedef struct  rmda_msg
404 {
405     int                   destNode;
406     gni_post_descriptor_t *pd;
407 #if !CMK_SMP
408     struct  rmda_msg      *next;
409 #endif
410 }RDMA_REQUEST;
411
412
413 #if CMK_SMP
414 #define SMP_LOCKS               0
415 #define ONE_SEND_QUEUE                  0
416 PCQueue sendRdmaBuf;
417 typedef struct  msg_list_index
418 {
419     PCQueue     sendSmsgBuf;
420     int         pushed;
421     CmiNodeLock   lock;
422 } MSG_LIST_INDEX;
423 char                *destpe_avail;
424 #if  !ONE_SEND_QUEUE && SMP_LOCKS
425     PCQueue     nonEmptyQueues;
426 #endif
427 #else         /* non-smp */
428
429 static RDMA_REQUEST        *sendRdmaBuf = 0;
430 static RDMA_REQUEST        *sendRdmaTail = 0;
431 typedef struct  msg_list_index
432 {
433     int         next;
434     MSG_LIST    *sendSmsgBuf;
435     MSG_LIST    *tail;
436 } MSG_LIST_INDEX;
437
438 #endif
439
440 // buffered send queue
441 #if ! ONE_SEND_QUEUE
442 typedef struct smsg_queue
443 {
444     MSG_LIST_INDEX   *smsg_msglist_index;
445     int               smsg_head_index;
446 } SMSG_QUEUE;
447 #else
448 typedef struct smsg_queue
449 {
450     PCQueue       sendMsgBuf;
451 }  SMSG_QUEUE;
452 #endif
453
454 SMSG_QUEUE                  smsg_queue;
455 SMSG_QUEUE                  smsg_oob_queue;
456
457 #if CMK_SMP
458
459 #define FreeMsgList(d)   free(d);
460 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
461
462 #else
463
464 static MSG_LIST       *msglist_freelist=0;
465
466 #define FreeMsgList(d)  \
467   do { \
468   (d)->next = msglist_freelist;\
469   msglist_freelist = d; \
470   } while (0)
471
472 #define MallocMsgList(d) \
473   do {  \
474   d = msglist_freelist;\
475   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
476              _MEMCHECK(d);\
477   } else msglist_freelist = d->next; \
478   d->next =0;  \
479   } while (0)
480
481 #endif
482
483 #if CMK_SMP
484
485 #define FreeControlMsg(d)      free(d);
486 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
487
488 #else
489
490 static CONTROL_MSG    *control_freelist=0;
491
492 #define FreeControlMsg(d)       \
493   do { \
494   (d)->next = control_freelist;\
495   control_freelist = d; \
496   } while (0);
497
498 #define MallocControlMsg(d) \
499   do {  \
500   d = control_freelist;\
501   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
502              _MEMCHECK(d);\
503   } else control_freelist = d->next;  \
504   } while (0);
505
506 #endif
507
508 #if CMK_SMP
509
510 #define FreeAckMsg(d)      free(d);
511 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
512
513 #else
514
515 static ACK_MSG        *ack_freelist=0;
516
517 #define FreeAckMsg(d)       \
518   do { \
519   (d)->next = ack_freelist;\
520   ack_freelist = d; \
521   } while (0)
522
523 #define MallocAckMsg(d) \
524   do { \
525   d = ack_freelist;\
526   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
527              _MEMCHECK(d);\
528   } else ack_freelist = d->next; \
529   } while (0)
530
531 #endif
532
533
534 # if CMK_SMP
535 #define FreeRdmaRequest(d)       free(d);
536 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
537 #else
538
539 static RDMA_REQUEST         *rdma_freelist = NULL;
540
541 #define FreeRdmaRequest(d)       \
542   do {  \
543   (d)->next = rdma_freelist;\
544   rdma_freelist = d;    \
545   } while (0)
546
547 #define MallocRdmaRequest(d) \
548   do {   \
549   d = rdma_freelist;\
550   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
551              _MEMCHECK(d);\
552   } else rdma_freelist = d->next; \
553   d->next =0;   \
554   } while (0)
555 #endif
556
557 /* reuse gni_post_descriptor_t */
558 static gni_post_descriptor_t *post_freelist=0;
559
560 #if  !CMK_SMP
561 #define FreePostDesc(d)       \
562     (d)->next_descr = post_freelist;\
563     post_freelist = d;
564
565 #define MallocPostDesc(d) \
566   d = post_freelist;\
567   if (d==0) { \
568      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
569      d->next_descr = 0;\
570       _MEMCHECK(d);\
571   } else post_freelist = d->next_descr;
572 #else
573
574 #define FreePostDesc(d)     free(d);
575 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
576
577 #endif
578
579
580 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
581 static int      buffered_smsg_counter = 0;
582
583 /* SmsgSend return success but message sent is not confirmed by remote side */
584 static MSG_LIST *buffered_fma_head = 0;
585 static MSG_LIST *buffered_fma_tail = 0;
586
587 /* functions  */
588 #define IsFree(a,ind)  !( a& (1<<(ind) ))
589 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
590 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
591
592 CpvDeclare(mempool_type*, mempool);
593
594 /* get the upper bound of log 2 */
595 int mylog2(int size)
596 {
597     int op = size;
598     unsigned int ret=0;
599     unsigned int mask = 0;
600     int i;
601     while(op>0)
602     {
603         op = op >> 1;
604         ret++;
605
606     }
607     for(i=1; i<ret; i++)
608     {
609         mask = mask << 1;
610         mask +=1;
611     }
612
613     ret -= ((size &mask) ? 0:1);
614     return ret;
615 }
616
617 static void
618 allgather(void *in,void *out, int len)
619 {
620     static int *ivec_ptr=NULL,already_called=0,job_size=0;
621     int i,rc;
622     int my_rank;
623     char *tmp_buf,*out_ptr;
624
625     if(!already_called) {
626
627         rc = PMI_Get_size(&job_size);
628         CmiAssert(rc == PMI_SUCCESS);
629         rc = PMI_Get_rank(&my_rank);
630         CmiAssert(rc == PMI_SUCCESS);
631
632         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
633         CmiAssert(ivec_ptr != NULL);
634
635         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
636         CmiAssert(rc == PMI_SUCCESS);
637
638         already_called = 1;
639
640     }
641
642     tmp_buf = (char *)malloc(job_size * len);
643     CmiAssert(tmp_buf);
644
645     rc = PMI_Allgather(in,tmp_buf,len);
646     CmiAssert(rc == PMI_SUCCESS);
647
648     out_ptr = out;
649
650     for(i=0;i<job_size;i++) {
651
652         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
653
654     }
655
656     free(tmp_buf);
657 }
658
659 static void
660 allgather_2(void *in,void *out, int len)
661 {
662     //PMI_Allgather is out of order
663     int i,rc, extend_len;
664     int  rank_index;
665     char *out_ptr, *out_ref;
666     char *in2;
667
668     extend_len = sizeof(int) + len;
669     in2 = (char*)malloc(extend_len);
670
671     memcpy(in2, &myrank, sizeof(int));
672     memcpy(in2+sizeof(int), in, len);
673
674     out_ptr = (char*)malloc(mysize*extend_len);
675
676     rc = PMI_Allgather(in2, out_ptr, extend_len);
677     GNI_RC_CHECK("allgather", rc);
678
679     out_ref = out;
680
681     for(i=0;i<mysize;i++) {
682         //rank index 
683         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
684         //copy to the rank index slot
685         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
686     }
687
688     free(out_ptr);
689     free(in2);
690
691 }
692
693 static unsigned int get_gni_nic_address(int device_id)
694 {
695     unsigned int address, cpu_id;
696     gni_return_t status;
697     int i, alps_dev_id=-1,alps_address=-1;
698     char *token, *p_ptr;
699
700     p_ptr = getenv("PMI_GNI_DEV_ID");
701     if (!p_ptr) {
702         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
703        
704         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
705     } else {
706         while ((token = strtok(p_ptr,":")) != NULL) {
707             alps_dev_id = atoi(token);
708             if (alps_dev_id == device_id) {
709                 break;
710             }
711             p_ptr = NULL;
712         }
713         CmiAssert(alps_dev_id != -1);
714         p_ptr = getenv("PMI_GNI_LOC_ADDR");
715         CmiAssert(p_ptr != NULL);
716         i = 0;
717         while ((token = strtok(p_ptr,":")) != NULL) {
718             if (i == alps_dev_id) {
719                 alps_address = atoi(token);
720                 break;
721             }
722             p_ptr = NULL;
723             ++i;
724         }
725         CmiAssert(alps_address != -1);
726         address = alps_address;
727     }
728     return address;
729 }
730
731 static uint8_t get_ptag(void)
732 {
733     char *p_ptr, *token;
734     uint8_t ptag;
735
736     p_ptr = getenv("PMI_GNI_PTAG");
737     CmiAssert(p_ptr != NULL);
738     token = strtok(p_ptr, ":");
739     ptag = (uint8_t)atoi(token);
740     return ptag;
741         
742 }
743
744 static uint32_t get_cookie(void)
745 {
746     uint32_t cookie;
747     char *p_ptr, *token;
748
749     p_ptr = getenv("PMI_GNI_COOKIE");
750     CmiAssert(p_ptr != NULL);
751     token = strtok(p_ptr, ":");
752     cookie = (uint32_t)atoi(token);
753
754     return cookie;
755 }
756
757 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
758 /* TODO: add any that are related */
759 /* =====End of Definitions of Message-Corruption Related Macros=====*/
760
761
762 #include "machine-lrts.h"
763 #include "machine-common-core.c"
764
765 /* Network progress function is used to poll the network when for
766    messages. This flushes receive buffers on some  implementations*/
767 #if CMK_MACHINE_PROGRESS_DEFINED
768 void CmiMachineProgressImpl() {
769 }
770 #endif
771
772 static void SendRdmaMsg();
773 static void PumpNetworkSmsg();
774 static void PumpLocalRdmaTransactions();
775 static int SendBufferMsg(SMSG_QUEUE *queue);
776
777 #if MACHINE_DEBUG_LOG
778 FILE *debugLog = NULL;
779 static CmiInt8 buffered_recv_msg = 0;
780 int         lrts_smsg_success = 0;
781 int         lrts_received_msg = 0;
782 #endif
783
784 static void sweep_mempool(mempool_type *mptr)
785 {
786     block_header *current = &(mptr->block_head);
787
788     printf("[n %d] sweep_mempool slot START.\n", myrank);
789     while( current!= NULL) {
790         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
791         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
792     }
793     printf("[n %d] sweep_mempool slot END.\n", myrank);
794 }
795
796 inline
797 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
798 {
799     block_header *current = *from;
800
801     //while(register_memory_size>= MAX_REG_MEM)
802     //{
803         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
804             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
805
806         *from = current;
807         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
808         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
809         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
810     //}
811     return GNI_RC_SUCCESS;
812 }
813
814 inline 
815 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
816 {
817     gni_return_t status = GNI_RC_SUCCESS;
818     //int size = GetMempoolsize(msg);
819     //void *blockaddr = GetMempoolBlockPtr(msg);
820     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
821    
822     block_header *current = &(mptr->block_head);
823     while(register_memory_size>= MAX_REG_MEM)
824     {
825         status = deregisterMemory(mptr, &current);
826         if (status != GNI_RC_SUCCESS) break;
827     }
828     if(register_memory_size>= MAX_REG_MEM) return status;
829
830     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
831     while(1)
832     {
833         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
834         if(status == GNI_RC_SUCCESS)
835         {
836             break;
837         }
838         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
839         {
840             CmiAbort("Memory registor for mempool fails\n");
841         }
842         else
843         {
844             status = deregisterMemory(mptr, &current);
845             if (status != GNI_RC_SUCCESS) break;
846         }
847     }; 
848     return status;
849 }
850
851 inline 
852 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
853 {
854     static int rank = -1;
855     int i;
856     gni_return_t status;
857     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
858     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
859     mempool_type *mptr;
860
861     status = registerFromMempool(mptr1, msg, size, t);
862     if (status == GNI_RC_SUCCESS) return status;
863 #if CMK_SMP 
864     for (i=0; i<CmiMyNodeSize()+1; i++) {
865       rank = (rank+1)%(CmiMyNodeSize()+1);
866       mptr = CpvAccessOther(mempool, rank);
867       if (mptr == mptr1) continue;
868       status = registerFromMempool(mptr, msg, size, t);
869       if (status == GNI_RC_SUCCESS) return status;
870     }
871 #endif
872     return  GNI_RC_ERROR_RESOURCE;
873 }
874
875 inline
876 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
877 {
878     MSG_LIST        *msg_tmp;
879     MallocMsgList(msg_tmp);
880     msg_tmp->destNode = destNode;
881     msg_tmp->size   = size;
882     msg_tmp->msg    = msg;
883     msg_tmp->tag    = tag;
884
885 #if !CMK_SMP
886     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
887         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
888         queue->smsg_head_index = destNode;
889         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
890     }else
891     {
892         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
893         queue->smsg_msglist_index[destNode].tail = msg_tmp;
894     }
895 #else
896 #if ONE_SEND_QUEUE
897     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
898 #else
899 #if SMP_LOCKS
900     CmiLock(queue->smsg_msglist_index[destNode].lock);
901     if(queue->smsg_msglist_index[destNode].pushed == 0)
902     {
903         PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
904     }
905     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
906     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
907 #else
908     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
909 #endif
910 #endif
911 #endif
912 #if PRINT_SYH
913     buffered_smsg_counter++;
914 #endif
915 }
916
917 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
918 {
919     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
920 }
921
922 inline
923 static void setup_smsg_connection(int destNode)
924 {
925     mdh_addr_list_t  *new_entry = 0;
926     gni_post_descriptor_t *pd;
927     gni_smsg_attr_t      *smsg_attr;
928     gni_return_t status = GNI_RC_NOT_DONE;
929     RDMA_REQUEST        *rdma_request_msg;
930     
931     if(smsg_available_slot == smsg_expand_slots)
932     {
933         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
934         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
935         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
936
937         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
938             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
939             GNI_MEM_READWRITE,   
940             -1,
941             &(new_entry->mdh));
942         smsg_available_slot = 0; 
943         new_entry->next = smsg_dynamic_list;
944         smsg_dynamic_list = new_entry;
945     }
946     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
947     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
948     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
949     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
950     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
951     smsg_attr->buff_size = smsg_memlen;
952     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
953     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
954     smsg_local_attr_vec[destNode] = smsg_attr;
955     smsg_available_slot++;
956     MallocPostDesc(pd);
957     pd->type            = GNI_POST_FMA_PUT;
958     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
959     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
960     pd->length          = sizeof(gni_smsg_attr_t);
961     pd->local_addr      = (uint64_t) smsg_attr;
962     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
963     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
964     pd->src_cq_hndl     = 0;
965     pd->rdma_mode       = 0;
966     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
967     print_smsg_attr(smsg_attr);
968     if(status == GNI_RC_ERROR_RESOURCE )
969     {
970         MallocRdmaRequest(rdma_request_msg);
971         rdma_request_msg->destNode = destNode;
972         rdma_request_msg->pd = pd;
973         /* buffer this request */
974     }
975 #if PRINT_SYH
976     if(status != GNI_RC_SUCCESS)
977        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
978     else
979         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
980 #endif
981 }
982
983 /* useDynamicSMSG */
984 inline 
985 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
986 {
987     gni_return_t status = GNI_RC_NOT_DONE;
988
989     if(mailbox_list->offset == mailbox_list->size)
990     {
991         dynamic_smsg_mailbox_t *new_mailbox_entry;
992         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
993         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
994         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
995         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
996         new_mailbox_entry->offset = 0;
997         
998         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
999             new_mailbox_entry->size, smsg_rx_cqh,
1000             GNI_MEM_READWRITE,   
1001             -1,
1002             &(new_mailbox_entry->mem_hndl));
1003
1004         GNI_RC_CHECK("register", status);
1005         new_mailbox_entry->next = mailbox_list;
1006         mailbox_list = new_mailbox_entry;
1007     }
1008     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1009     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1010     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1011     local_smsg_attr->mbox_offset = mailbox_list->offset;
1012     mailbox_list->offset += smsg_memlen;
1013     local_smsg_attr->buff_size = smsg_memlen;
1014     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1015     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1016 }
1017
1018 /* useDynamicSMSG */
1019 inline 
1020 static int connect_to(int destNode)
1021 {
1022     gni_return_t status = GNI_RC_NOT_DONE;
1023     CmiAssert(smsg_connected_flag[destNode] == 0);
1024     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1025     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1026     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1027     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1028     
1029     CMI_GNI_LOCK
1030     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1031     CMI_GNI_UNLOCK
1032     if (status == GNI_RC_ERROR_RESOURCE) {
1033       /* possibly destNode is making connection at the same time */
1034       free(smsg_attr_vector_local[destNode]);
1035       smsg_attr_vector_local[destNode] = NULL;
1036       free(smsg_attr_vector_remote[destNode]);
1037       smsg_attr_vector_remote[destNode] = NULL;
1038       mailbox_list->offset -= smsg_memlen;
1039       return 0;
1040     }
1041     GNI_RC_CHECK("GNI_Post", status);
1042     smsg_connected_flag[destNode] = 1;
1043     return 1;
1044 }
1045
1046 inline 
1047 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1048 {
1049     unsigned int          remote_address;
1050     uint32_t              remote_id;
1051     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1052     gni_smsg_attr_t       *smsg_attr;
1053     gni_post_descriptor_t *pd;
1054     gni_post_state_t      post_state;
1055     char                  *real_data; 
1056
1057     if (useDynamicSMSG) {
1058         switch (smsg_connected_flag[destNode]) {
1059         case 0: 
1060             connect_to(destNode);         /* continue to case 1 */
1061         case 1:                           /* pending connection, do nothing */
1062             status = GNI_RC_NOT_DONE;
1063             if(inbuff ==0)
1064                 buffer_small_msgs(queue, msg, size, destNode, tag);
1065             return status;
1066         }
1067     }
1068 #if CMK_SMP
1069 #if ! ONE_SEND_QUEUE
1070     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1071 #endif
1072     {
1073 #else
1074     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1075     {
1076 #endif
1077         CMI_GNI_LOCK
1078         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
1079         CMI_GNI_UNLOCK
1080         if(status == GNI_RC_SUCCESS)
1081         {
1082 #if CMK_SMP_TRACE_COMMTHREAD
1083             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1084             { 
1085                 START_EVENT();
1086                 if ( tag == SMALL_DATA_TAG)
1087                     real_data = (char*)msg; 
1088                 else 
1089                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1090                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1091             }
1092 #endif
1093             smsg_send_count ++;
1094             return status;
1095         }else
1096             status = GNI_RC_ERROR_RESOURCE;
1097     }
1098     if(inbuff ==0)
1099         buffer_small_msgs(queue, msg, size, destNode, tag);
1100     return status;
1101 }
1102
1103 inline 
1104 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1105 {
1106     /* construct a control message and send */
1107     CONTROL_MSG         *control_msg_tmp;
1108     MallocControlMsg(control_msg_tmp);
1109     control_msg_tmp->source_addr = (uint64_t)msg;
1110     control_msg_tmp->seq_id    = seqno;
1111     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1112 #if     USE_LRTS_MEMPOOL
1113     if(size < BIG_MSG)
1114     {
1115         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1116     }
1117     else
1118     {
1119         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1120         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1121         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1122     }
1123 #else
1124     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1125 #endif
1126     return control_msg_tmp;
1127 }
1128
1129 #define BLOCKING_SEND_CONTROL    0
1130
1131 // Large message, send control to receiver, receiver register memory and do a GET, 
1132 // return 1 - send no success
1133 inline
1134 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1135 {
1136     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1137     uint32_t            vmdh_index  = -1;
1138     int                 size;
1139     int                 offset = 0;
1140     uint64_t            source_addr;
1141     int                 register_size; 
1142     void                *msg;
1143
1144     size    =   control_msg_tmp->total_length;
1145     source_addr = control_msg_tmp->source_addr;
1146     register_size = control_msg_tmp->length;
1147
1148 #if  USE_LRTS_MEMPOOL
1149     if( control_msg_tmp->seq_id == 0 ){
1150 #if BLOCKING_SEND_CONTROL
1151         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1152             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1153                 LrtsAdvanceCommunication(0);
1154         }
1155 #endif
1156         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1157         {
1158             msg = (void*)source_addr;
1159             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1160             {
1161                 if(!inbuff)
1162                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1163                 return GNI_RC_ERROR_NOMEM;
1164             }
1165             //register the corresponding mempool
1166             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1167             if(status == GNI_RC_SUCCESS)
1168             {
1169                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1170             }
1171         }else
1172         {
1173             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1174             status = GNI_RC_SUCCESS;
1175         }
1176         if(NoMsgInSend( control_msg_tmp->source_addr))
1177             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1178         else
1179             register_size = 0;
1180     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1181     {
1182         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1183         source_addr += offset;
1184         size = control_msg_tmp->length;
1185 #if BLOCKING_SEND_CONTROL
1186         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1187             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1188                 LrtsAdvanceCommunication(0);
1189         }
1190 #endif
1191         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1192             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1193             {
1194                 if(!inbuff)
1195                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1196                 return GNI_RC_ERROR_NOMEM;
1197             }
1198             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1199             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1200         }
1201         else
1202         {
1203             status = GNI_RC_SUCCESS;
1204         }
1205         register_size = 0;  
1206     }
1207
1208     if(status == GNI_RC_SUCCESS)
1209     {
1210         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1211         if(status == GNI_RC_SUCCESS)
1212         {
1213             buffered_send_msg += register_size;
1214             if(control_msg_tmp->seq_id == 0)
1215             {
1216                 IncreaseMsgInSend(source_addr);
1217             }
1218             FreeControlMsg(control_msg_tmp);
1219             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1220         }else
1221             status = GNI_RC_ERROR_RESOURCE;
1222
1223     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1224     {
1225         CmiAbort("Memory registor for large msg\n");
1226     }else 
1227     {
1228         status = GNI_RC_ERROR_NOMEM; 
1229         if(!inbuff)
1230             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1231     }
1232     return status;
1233 #else
1234     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1235     if(status == GNI_RC_SUCCESS)
1236     {
1237         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1238         if(status == GNI_RC_SUCCESS)
1239         {
1240             FreeControlMsg(control_msg_tmp);
1241         }
1242     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1243     {
1244         CmiAbort("Memory registor for large msg\n");
1245     }else 
1246     {
1247         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1248     }
1249     return status;
1250 #endif
1251 }
1252
1253 inline void LrtsPrepareEnvelope(char *msg, int size)
1254 {
1255     CmiSetMsgSize(msg, size);
1256 }
1257
1258 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1259 {
1260     gni_return_t        status  =   GNI_RC_SUCCESS;
1261     uint8_t tag;
1262     CONTROL_MSG         *control_msg_tmp;
1263     int                 oob = ( mode & OUT_OF_BAND);
1264     SMSG_QUEUE          *queue;
1265
1266     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1267 #if CMK_USE_OOB
1268     queue = oob? &smsg_oob_queue : &smsg_queue;
1269 #else
1270     queue = &smsg_queue;
1271 #endif
1272
1273     LrtsPrepareEnvelope(msg, size);
1274
1275 #if PRINT_SYH
1276     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1277 #endif 
1278 #if CMK_SMP && COMM_THREAD_SEND
1279     if(size <= SMSG_MAX_MSG)
1280         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1281     else if (size < BIG_MSG) {
1282         control_msg_tmp =  construct_control_msg(size, msg, 0);
1283         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1284     }
1285     else {
1286           CmiSetMsgSeq(msg, 0);
1287           control_msg_tmp =  construct_control_msg(size, msg, 1);
1288           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1289     }
1290 #else   //non-smp, smp(worker sending)
1291     if(size <= SMSG_MAX_MSG)
1292     {
1293         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1294             CmiFree(msg);
1295     }
1296     else if (size < BIG_MSG) {
1297         control_msg_tmp =  construct_control_msg(size, msg, 0);
1298         send_large_messages(queue, destNode, control_msg_tmp, 0);
1299     }
1300     else {
1301 #if     USE_LRTS_MEMPOOL
1302         CmiSetMsgSeq(msg, 0);
1303         control_msg_tmp =  construct_control_msg(size, msg, 1);
1304         send_large_messages(queue, destNode, control_msg_tmp, 0);
1305 #else
1306         control_msg_tmp =  construct_control_msg(size, msg, 0);
1307         send_large_messages(queue, destNode, control_msg_tmp, 0);
1308 #endif
1309     }
1310 #endif
1311     return 0;
1312 }
1313
1314 static void    PumpDatagramConnection();
1315 static void registerUserTraceEvents() {
1316 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1317     traceRegisterUserEvent("setting up connections", 10);
1318     traceRegisterUserEvent("Receiving small msgs", 20);
1319     traceRegisterUserEvent("Release local transaction", 30);
1320     traceRegisterUserEvent("Sending buffered small msgs", 40);
1321     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1322 #endif
1323 }
1324
1325 static void ProcessDeadlock()
1326 {
1327     static CmiUInt8 *ptr = NULL;
1328     static CmiUInt8  last = 0, mysum, sum;
1329     static int count = 0;
1330     gni_return_t status;
1331     int i;
1332
1333 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1334 //sweep_mempool(CpvAccess(mempool));
1335     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1336     mysum = smsg_send_count + smsg_recv_count;
1337     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1338     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1339     GNI_RC_CHECK("PMI_Allgather", status);
1340     sum = 0;
1341     for (i=0; i<mysize; i++)  sum+= ptr[i];
1342     if (last == 0 || sum == last) 
1343         count++;
1344     else
1345         count = 0;
1346     last = sum;
1347     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1348     if (count == 2) { 
1349         /* detected twice, it is a real deadlock */
1350         if (myrank == 0)  {
1351             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %d and %d).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1352             CmiAbort("Fatal> Deadlock detected.");
1353         }
1354
1355     }
1356     _detected_hang = 0;
1357 }
1358
1359 static void CheckProgress()
1360 {
1361     if (smsg_send_count == last_smsg_send_count &&
1362         smsg_recv_count == last_smsg_recv_count ) 
1363     {
1364         _detected_hang = 1;
1365 #if !CMK_SMP
1366         if (_detected_hang) ProcessDeadlock();
1367 #endif
1368
1369     }
1370     else {
1371         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1372         last_smsg_send_count = smsg_send_count;
1373         last_smsg_recv_count = smsg_recv_count;
1374         _detected_hang = 0;
1375     }
1376 }
1377
1378 static void set_limit()
1379 {
1380     if (!user_set_flag && CmiMyRank() == 0) {
1381         int mynode = CmiPhysicalNodeID(CmiMyPe());
1382         int numpes = CmiNumPesOnPhysicalNode(mynode);
1383         int numprocesses = numpes / CmiMyNodeSize();
1384         MAX_REG_MEM  = _totalmem / numprocesses;
1385         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1386         if (CmiMyPe() == 0)
1387            printf("mem_max = %d, send_max =%d\n", MAX_REG_MEM, MAX_BUFF_SEND);
1388     }
1389 }
1390
1391 void LrtsPostCommonInit(int everReturn)
1392 {
1393 #if CMK_DIRECT
1394     CmiDirectInit();
1395 #endif
1396 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1397     CpvInitialize(double, projTraceStart);
1398     /* only PE 0 needs to care about registration (to generate sts file). */
1399     if (CmiMyPe() == 0) {
1400         registerMachineUserEventsFunction(&registerUserTraceEvents);
1401     }
1402 #endif
1403
1404 #if CMK_SMP
1405     CmiIdleState *s=CmiNotifyGetState();
1406     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1407     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1408 #else
1409     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1410     if (useDynamicSMSG)
1411     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1412 #endif
1413
1414     if (_checkProgress)
1415 #if CMK_SMP
1416     if (CmiMyRank() == 0)
1417 #endif
1418     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1419
1420     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1421 }
1422
1423 /* this is called by worker thread */
1424 void LrtsPostNonLocal(){
1425 #if CMK_SMP
1426 #if MULTI_THREAD_SEND
1427     if(mysize == 1) return;
1428     PumpLocalRdmaTransactions();
1429 #if CMK_USE_OOB
1430     if (SendBufferMsg(&smsg_oob_queue) == 1)
1431 #endif
1432     SendBufferMsg(&smsg_queue);
1433     SendRdmaMsg();
1434 #endif
1435 #endif
1436 }
1437
1438 /* useDynamicSMSG */
1439 static void    PumpDatagramConnection()
1440 {
1441     uint32_t          remote_address;
1442     uint32_t          remote_id;
1443     gni_return_t status;
1444     gni_post_state_t  post_state;
1445     uint64_t          datagram_id;
1446     int i;
1447
1448    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1449    {
1450        if (datagram_id >= mysize) {           /* bound endpoint */
1451            int pe = datagram_id - mysize;
1452            CMI_GNI_LOCK
1453            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1454            CMI_GNI_UNLOCK
1455            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1456            {
1457                CmiAssert(remote_id == pe);
1458                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1459                GNI_RC_CHECK("Dynamic SMSG Init", status);
1460 #if PRINT_SYH
1461                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1462 #endif
1463                CmiAssert(smsg_connected_flag[pe] == 1);
1464                smsg_connected_flag[pe] = 2;
1465            }
1466        }
1467        else {         /* unbound ep */
1468            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1469            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1470            {
1471                CmiAssert(remote_id<mysize);
1472                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1473                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1474                GNI_RC_CHECK("Dynamic SMSG Init", status);
1475 #if PRINT_SYH
1476                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1477 #endif
1478                smsg_connected_flag[remote_id] = 2;
1479
1480                alloc_smsg_attr(&send_smsg_attr);
1481                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1482                GNI_RC_CHECK("post unbound datagram", status);
1483            }
1484        }
1485    }
1486 }
1487
1488 /* pooling CQ to receive network message */
1489 static void PumpNetworkRdmaMsgs()
1490 {
1491     gni_cq_entry_t      event_data;
1492     gni_return_t        status;
1493
1494 }
1495
1496 inline 
1497 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1498 {
1499     RDMA_REQUEST        *rdma_request_msg;
1500     MallocRdmaRequest(rdma_request_msg);
1501     rdma_request_msg->destNode = inst_id;
1502     rdma_request_msg->pd = pd;
1503 #if CMK_SMP
1504     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1505 #else
1506     if(sendRdmaBuf == 0)
1507     {
1508         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1509     }else{
1510         sendRdmaTail->next = rdma_request_msg;
1511         sendRdmaTail =  rdma_request_msg;
1512     }
1513 #endif
1514
1515 }
1516 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1517 static void PumpNetworkSmsg()
1518 {
1519     uint64_t            inst_id;
1520     int                 ret;
1521     gni_cq_entry_t      event_data;
1522     gni_return_t        status, status2;
1523     void                *header;
1524     uint8_t             msg_tag;
1525     int                 msg_nbytes;
1526     void                *msg_data;
1527     gni_mem_handle_t    msg_mem_hndl;
1528     gni_smsg_attr_t     *smsg_attr;
1529     gni_smsg_attr_t     *remote_smsg_attr;
1530     int                 init_flag;
1531     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1532     uint64_t            source_addr;
1533     SMSG_QUEUE         *queue = &smsg_queue;
1534 #if     CMK_DIRECT
1535     cmidirectMsg        *direct_msg;
1536 #endif
1537     while(1)
1538     {
1539         CMI_GNI_LOCK
1540         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1541         CMI_GNI_UNLOCK
1542         if(status != GNI_RC_SUCCESS)
1543             break;
1544         inst_id = GNI_CQ_GET_INST_ID(event_data);
1545         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1546 #if PRINT_SYH
1547         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1548 #endif
1549         if (useDynamicSMSG) {
1550             /* subtle: smsg may come before connection is setup */
1551             while (smsg_connected_flag[inst_id] != 2) 
1552                PumpDatagramConnection();
1553         }
1554         msg_tag = GNI_SMSG_ANY_TAG;
1555         while(1) {
1556             CMI_GNI_LOCK
1557             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1558             CMI_GNI_UNLOCK
1559             if (status != GNI_RC_SUCCESS)
1560                 break;
1561 #if PRINT_SYH
1562             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1563 #endif
1564             /* copy msg out and then put into queue (small message) */
1565             switch (msg_tag) {
1566             case SMALL_DATA_TAG:
1567             {
1568                 START_EVENT();
1569                 msg_nbytes = CmiGetMsgSize(header);
1570                 msg_data    = CmiAlloc(msg_nbytes);
1571                 memcpy(msg_data, (char*)header, msg_nbytes);
1572 #if CMK_SMP_TRACE_COMMTHREAD
1573                 TRACE_COMM_RECV(CpvAccess(projTraceStart), msg_data);
1574 #endif
1575                 handleOneRecvedMsg(msg_nbytes, msg_data);
1576                 break;
1577             }
1578             case LMSG_INIT_TAG:
1579             {
1580                 getLargeMsgRequest(header, inst_id);
1581                 break;
1582             }
1583             case ACK_TAG:   //msg fit into mempool
1584             {
1585                 /* Get is done, release message . Now put is not used yet*/
1586                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1587 #if ! USE_LRTS_MEMPOOL
1588                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1589 #else
1590                 DecreaseMsgInSend(msg);
1591 #endif
1592                 if(NoMsgInSend(msg))
1593                     buffered_send_msg -= GetMempoolsize(msg);
1594                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1595                 CmiFree(msg);
1596                 break;
1597             }
1598             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1599             {
1600                 header_tmp = (CONTROL_MSG *) header;
1601                 void *msg = (void*)(header_tmp->source_addr);
1602                 int cur_seq = CmiGetMsgSeq(msg);
1603                 int offset = ONE_SEG*(cur_seq+1);
1604                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1605                 buffered_send_msg -= header_tmp->length;
1606                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1607                 if (remain_size < 0) remain_size = 0;
1608                 CmiSetMsgSize(msg, remain_size);
1609                 if(remain_size <= 0) //transaction done
1610                 {
1611                     CmiFree(msg);
1612                 }else if (header_tmp->total_length > offset)
1613                 {
1614                     CmiSetMsgSeq(msg, cur_seq+1);
1615                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1616                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1617                     //send next seg
1618                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1619                          // pipelining
1620                     if (header_tmp->seq_id == 1) {
1621                       int i;
1622                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1623                         int seq = cur_seq+i+2;
1624                         CmiSetMsgSeq(msg, seq-1);
1625                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1626                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1627                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1628                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1629                       }
1630                     }
1631                 }
1632                 break;
1633             }
1634 #if CMK_PERSISTENT_COMM
1635             case PUT_DONE_TAG: //persistent message
1636                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1637                 int size = ((CONTROL_MSG *) header)->length;
1638                 CmiReference(msg);
1639                 handleOneRecvedMsg(size, msg); 
1640                 break;
1641 #endif
1642 #if CMK_DIRECT
1643             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1644                 //create a trigger message
1645                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1646                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1647                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1648                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1649                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1650                 break;
1651 #endif
1652             default: {
1653                 printf("weird tag problem\n");
1654                 CmiAbort("Unknown tag\n");
1655                      }
1656             }               // end switch
1657 #if PRINT_SYH
1658             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1659 #endif
1660             CMI_GNI_LOCK
1661             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1662             CMI_GNI_UNLOCK
1663             smsg_recv_count ++;
1664             msg_tag = GNI_SMSG_ANY_TAG;
1665         } //endwhile getNext
1666     }   //end while GetEvent
1667     if(status == GNI_RC_ERROR_RESOURCE)
1668     {
1669         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1670         GNI_RC_CHECK("Smsg_rx_cq full", status);
1671     }
1672 }
1673
1674 static void printDesc(gni_post_descriptor_t *pd)
1675 {
1676     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1677 }
1678
1679 // for BIG_MSG called on receiver side for receiving control message
1680 // LMSG_INIT_TAG
1681 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1682 {
1683 #if     USE_LRTS_MEMPOOL
1684     CONTROL_MSG         *request_msg;
1685     gni_return_t        status = GNI_RC_SUCCESS;
1686     void                *msg_data;
1687     gni_post_descriptor_t *pd;
1688     gni_mem_handle_t    msg_mem_hndl;
1689     int source, size, transaction_size, offset = 0;
1690     int     register_size = 0;
1691
1692     // initial a get to transfer data from the sender side */
1693     request_msg = (CONTROL_MSG *) header;
1694     size = request_msg->total_length;
1695     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1696     if(request_msg->seq_id < 2)   {
1697         msg_data = CmiAlloc(size);
1698         CmiSetMsgSeq(msg_data, 0);
1699         _MEMCHECK(msg_data);
1700     }
1701     else {
1702         offset = ONE_SEG*(request_msg->seq_id-1);
1703         msg_data = (char*)request_msg->dest_addr + offset;
1704     }
1705    
1706     MallocPostDesc(pd);
1707     pd->cqwrite_value = request_msg->seq_id;
1708     if( request_msg->seq_id == 0)
1709     {
1710         pd->local_mem_hndl= GetMemHndl(msg_data);
1711         transaction_size = ALIGN64(size);
1712         if(IsMemHndlZero(pd->local_mem_hndl))
1713         {   
1714             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1715             if(status == GNI_RC_SUCCESS)
1716             {
1717                 pd->local_mem_hndl = GetMemHndl(msg_data);
1718             }
1719             else
1720             {
1721                 SetMemHndlZero(pd->local_mem_hndl);
1722             }
1723         }
1724         if(NoMsgInRecv( (void*)(msg_data)))
1725             register_size = GetMempoolsize((void*)(msg_data));
1726         else
1727             register_size = 0;
1728     }
1729     else{
1730         transaction_size = ALIGN64(request_msg->length);
1731         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1732         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1733         {
1734             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1735         }
1736     }
1737     pd->first_operand = ALIGN64(size);                   //  total length
1738
1739     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1740         pd->type            = GNI_POST_FMA_GET;
1741     else
1742         pd->type            = GNI_POST_RDMA_GET;
1743 #if REMOTE_EVENT
1744     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1745 #else
1746     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1747 #endif
1748     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1749     pd->length          = transaction_size;
1750     pd->local_addr      = (uint64_t) msg_data;
1751     pd->remote_addr     = request_msg->source_addr + offset;
1752     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1753     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1754     pd->rdma_mode       = 0;
1755     pd->amo_cmd         = 0;
1756
1757     //memory registration success
1758     if(status == GNI_RC_SUCCESS)
1759     {
1760         CMI_GNI_LOCK
1761         if(pd->type == GNI_POST_RDMA_GET) 
1762             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1763         else
1764             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1765         CMI_GNI_UNLOCK
1766
1767         if(status == GNI_RC_SUCCESS )
1768         {
1769             if(pd->cqwrite_value == 0)
1770             {
1771 #if MACHINE_DEBUG_LOG
1772                 buffered_recv_msg += register_size;
1773                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1774 #endif
1775                 IncreaseMsgInRecv(msg_data);
1776             }
1777         }
1778     }else
1779     {
1780         SetMemHndlZero((pd->local_mem_hndl));
1781     }
1782     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1783     {
1784         bufferRdmaMsg(inst_id, pd); 
1785     }else {
1786          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1787         GNI_RC_CHECK("GetLargeAFter posting", status);
1788     }
1789 #else
1790     CONTROL_MSG         *request_msg;
1791     gni_return_t        status;
1792     void                *msg_data;
1793     gni_post_descriptor_t *pd;
1794     RDMA_REQUEST        *rdma_request_msg;
1795     gni_mem_handle_t    msg_mem_hndl;
1796     //int source;
1797     // initial a get to transfer data from the sender side */
1798     request_msg = (CONTROL_MSG *) header;
1799     msg_data = CmiAlloc(request_msg->length);
1800     _MEMCHECK(msg_data);
1801
1802     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1803
1804     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1805     {
1806         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1807     }
1808
1809     MallocPostDesc(pd);
1810     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1811         pd->type            = GNI_POST_FMA_GET;
1812     else
1813         pd->type            = GNI_POST_RDMA_GET;
1814 #if REMOTE_EVENT
1815     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1816 #else
1817     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1818 #endif
1819     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1820     pd->length          = ALIGN64(request_msg->length);
1821     pd->local_addr      = (uint64_t) msg_data;
1822     pd->remote_addr     = request_msg->source_addr;
1823     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1824     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1825     pd->rdma_mode       = 0;
1826     pd->amo_cmd         = 0;
1827
1828     //memory registration successful
1829     if(status == GNI_RC_SUCCESS)
1830     {
1831         pd->local_mem_hndl  = msg_mem_hndl;
1832         CMI_GNI_LOCK
1833         if(pd->type == GNI_POST_RDMA_GET) 
1834             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1835         else
1836             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1837         CMI_GNI_UNLOCK
1838     }else
1839     {
1840         SetMemHndlZero(pd->local_mem_hndl);
1841     }
1842     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1843     {
1844         MallocRdmaRequest(rdma_request_msg);
1845         rdma_request_msg->next = 0;
1846         rdma_request_msg->destNode = inst_id;
1847         rdma_request_msg->pd = pd;
1848         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1849     }else {
1850         GNI_RC_CHECK("AFter posting", status);
1851     }
1852 #endif
1853 }
1854
1855 static void PumpLocalRdmaTransactions()
1856 {
1857     gni_cq_entry_t          ev;
1858     gni_return_t            status;
1859     uint64_t                type, inst_id;
1860     gni_post_descriptor_t   *tmp_pd;
1861     MSG_LIST                *ptr;
1862     CONTROL_MSG             *ack_msg_tmp;
1863     ACK_MSG                 *ack_msg;
1864     uint8_t                 msg_tag;
1865 #if CMK_DIRECT
1866     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1867 #endif
1868     SMSG_QUEUE         *queue = &smsg_queue;
1869
1870     while(1) {
1871         CMI_GNI_LOCK 
1872         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1873         CMI_GNI_UNLOCK
1874         if(status != GNI_RC_SUCCESS) break;
1875         
1876         type = GNI_CQ_GET_TYPE(ev);
1877         if (type == GNI_CQ_EVENT_TYPE_POST)
1878         {
1879             inst_id     = GNI_CQ_GET_INST_ID(ev);
1880 #if PRINT_SYH
1881             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1882 #endif
1883             CMI_GNI_LOCK
1884             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1885             CMI_GNI_UNLOCK
1886
1887             switch (tmp_pd->type) {
1888 #if CMK_PERSISTENT_COMM || CMK_DIRECT
1889             case GNI_POST_RDMA_PUT:
1890 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
1891                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1892 #endif
1893             case GNI_POST_FMA_PUT:
1894                 if(tmp_pd->amo_cmd == 1) {
1895                     //sender ACK to receiver to trigger it is done
1896                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1897                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
1898                     msg_tag = DIRECT_PUT_DONE_TAG;
1899                 }
1900                 else {
1901                     CmiFree((void *)tmp_pd->local_addr);
1902                     MallocControlMsg(ack_msg_tmp);
1903                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1904                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1905                     msg_tag = PUT_DONE_TAG;
1906                 }
1907                 break;
1908 #endif
1909             case GNI_POST_RDMA_GET:
1910             case GNI_POST_FMA_GET:  {
1911 #if  ! USE_LRTS_MEMPOOL
1912                 MallocControlMsg(ack_msg_tmp);
1913                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1914                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1915                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1916                 msg_tag = ACK_TAG;  
1917 #else
1918                 int seq_id = tmp_pd->cqwrite_value;
1919                 if(seq_id > 0)      // BIG_MSG
1920                 {
1921                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1922                     MallocControlMsg(ack_msg_tmp);
1923                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1924                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1925                     ack_msg_tmp->seq_id = seq_id;
1926                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1927                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1928                     ack_msg_tmp->length = tmp_pd->length;
1929                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1930                     msg_tag = BIG_MSG_TAG; 
1931                 } 
1932                 else
1933                 {
1934                     MallocAckMsg(ack_msg);
1935                     ack_msg->source_addr = tmp_pd->remote_addr;
1936                     msg_tag = ACK_TAG;  
1937                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
1938                 }
1939 #endif
1940                 break;
1941             }
1942             default:
1943                 CmiPrintf("type=%d\n", tmp_pd->type);
1944                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1945             }      /* end of switch */
1946
1947 #if CMK_DIRECT
1948             if (tmp_pd->amo_cmd == 1) {
1949                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1950                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
1951             }
1952             else
1953 #endif
1954             if (msg_tag == ACK_TAG) {
1955                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
1956                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
1957             }
1958             else {
1959                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
1960                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
1961             }
1962 #if CMK_PERSISTENT_COMM
1963             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1964 #endif
1965             {
1966                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1967 #if PRINT_SYH
1968                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1969 #endif
1970 #if CMK_SMP_TRACE_COMMTHREAD
1971                     START_EVENT();
1972 #endif
1973                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1974                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1975 #if MACHINE_DEBUG_LOG
1976                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1977                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1978                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1979 #endif
1980 #if CMK_SMP_TRACE_COMMTHREAD
1981                     TRACE_COMM_RECV(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1982 #endif
1983                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1984                 }else if(msg_tag == BIG_MSG_TAG){
1985                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1986                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1987                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1988 #if CMK_SMP_TRACE_COMMTHREAD
1989                         START_EVENT();
1990 #endif
1991 #if PRINT_SYH
1992                         printf("Pipeline msg done [%d]\n", myrank);
1993 #endif
1994 #if CMK_SMP_TRACE_COMMTHREAD
1995                         TRACE_COMM_RECV(CpvAccess(projTraceStart), msg);
1996 #endif
1997                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1998                     }
1999                 }
2000             }
2001             FreePostDesc(tmp_pd);
2002         }
2003     } //end while
2004     if(status == GNI_RC_ERROR_RESOURCE)
2005     {
2006         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
2007         GNI_RC_CHECK("Smsg_tx_cq full", status);
2008     }
2009 }
2010
2011 static void  SendRdmaMsg()
2012 {
2013     gni_return_t            status = GNI_RC_SUCCESS;
2014     gni_mem_handle_t        msg_mem_hndl;
2015     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
2016     RDMA_REQUEST            *pre = 0;
2017     int i, register_size = 0;
2018     void                    *msg;
2019 #if CMK_SMP
2020     int len = PCQueueLength(sendRdmaBuf);
2021     for (i=0; i<len; i++)
2022     {
2023         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2024         if (ptr == NULL) break;
2025 #else
2026     ptr = sendRdmaBuf;
2027     while (ptr!=0)
2028     {
2029 #endif 
2030         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2031         gni_post_descriptor_t *pd = ptr->pd;
2032         status = GNI_RC_SUCCESS;
2033         
2034         if(pd->cqwrite_value == 0)
2035         {
2036             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2037             {
2038                 msg = (void*)(pd->local_addr);
2039                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2040                 if(status == GNI_RC_SUCCESS)
2041                 {
2042                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2043                 }
2044             }else
2045             {
2046                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2047             }
2048             if(NoMsgInRecv( (void*)(pd->local_addr)))
2049                 register_size = GetMempoolsize((void*)(pd->local_addr));
2050             else
2051                 register_size = 0;
2052         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2053         {
2054             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2055         }
2056         if(status == GNI_RC_SUCCESS)        //mem register good
2057         {
2058             CMI_GNI_LOCK
2059             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2060                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2061             else
2062                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2063             CMI_GNI_UNLOCK
2064             if(status == GNI_RC_SUCCESS)    //post good
2065             {
2066 #if !CMK_SMP
2067                 tmp_ptr = ptr;
2068                 if(pre != 0) {
2069                     pre->next = ptr->next;
2070                 }
2071                 else {
2072                     sendRdmaBuf = ptr->next;
2073                 }
2074                 ptr = ptr->next;
2075                 FreeRdmaRequest(tmp_ptr);
2076 #endif
2077                 if(pd->cqwrite_value == 0)
2078                 {
2079                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2080                 }
2081 #if MACHINE_DEBUG_LOG
2082                 buffered_recv_msg += register_size;
2083                 MACHSTATE(8, "GO request from buffered\n"); 
2084 #endif
2085             }else           // cannot post
2086             {
2087 #if CMK_SMP
2088                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2089 #else
2090                 pre = ptr;
2091                 ptr = ptr->next;
2092 #endif
2093                 break;
2094             }
2095         } else          //memory registration fails
2096         {
2097 #if CMK_SMP
2098             PCQueuePush(sendRdmaBuf, (char*)ptr);
2099 #else
2100             pre = ptr;
2101             ptr = ptr->next;
2102 #endif
2103         }
2104     } //end while
2105 #if ! CMK_SMP
2106     if(ptr == 0)
2107         sendRdmaTail = pre;
2108 #endif
2109 }
2110
2111 // return 1 if all messages are sent
2112 static int SendBufferMsg(SMSG_QUEUE *queue)
2113 {
2114     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2115     CONTROL_MSG         *control_msg_tmp;
2116     gni_return_t        status;
2117     int                 done = 1;
2118     int                 register_size;
2119     void                *register_addr;
2120     int                 index_previous = -1;
2121 #if CMI_EXERT_SEND_CAP
2122     int                 sent_cnt = 0;
2123 #endif
2124
2125 #if CMK_SMP
2126     int          index = 0;
2127 #if ONE_SEND_QUEUE
2128     memset(destpe_avail, 0, mysize * sizeof(char));
2129     for (index=0; index<1; index++)
2130     {
2131         int i, len = PCQueueLength(queue->sendMsgBuf);
2132         for (i=0; i<len; i++) 
2133         {
2134             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2135             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
2136                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2137                 continue;
2138             }
2139 #else
2140 #if SMP_LOCKS
2141     int nonempty = PCQueueLength(nonEmptyQueues);
2142     for(index =0; index<nonempty; index++)
2143     {
2144         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2145         PCQueue current_queue= current_list-> sendSmsgBuf;
2146         CmiLock(current_list->lock);
2147         int i, len = PCQueueLength(current_queue);
2148         current_list->pushed = 0;
2149         CmiUnlock(current_list->lock);
2150 #else
2151     for(index =0; index<mysize; index++)
2152     {
2153         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2154         int i, len = PCQueueLength(current_queue);
2155 #endif
2156         for (i=0; i<len; i++) 
2157         {
2158             ptr = (MSG_LIST*)PCQueuePop(current_queue);
2159             if (ptr == 0) break;
2160 #endif
2161 #else
2162     int index = queue->smsg_head_index;
2163     while(index != -1)
2164     {
2165         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2166         pre = 0;
2167         while(ptr != 0)
2168         {
2169 #endif
2170             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2171             status = GNI_RC_ERROR_RESOURCE;
2172             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2173                 /* connection not exists yet */
2174             }
2175             else
2176             switch(ptr->tag)
2177             {
2178             case SMALL_DATA_TAG:
2179                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2180                 if(status == GNI_RC_SUCCESS)
2181                 {
2182                     CmiFree(ptr->msg);
2183                 }
2184                 break;
2185             case LMSG_INIT_TAG:
2186                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2187                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2188                 break;
2189             case ACK_TAG:
2190                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2191                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2192                 break;
2193             case BIG_MSG_TAG:
2194                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);  
2195                 if(status == GNI_RC_SUCCESS)
2196                 {
2197                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2198                 }
2199                 break;
2200 #if CMK_DIRECT
2201             case DIRECT_PUT_DONE_TAG:
2202                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2203                 if(status == GNI_RC_SUCCESS)
2204                 {
2205                     free((CMK_DIRECT_HEADER*)ptr->msg);
2206                 }
2207                 break;
2208
2209 #endif
2210             default:
2211                 printf("Weird tag\n");
2212                 CmiAbort("should not happen\n");
2213             }       // end switch
2214             if(status == GNI_RC_SUCCESS)
2215             {
2216 #if !CMK_SMP
2217                 tmp_ptr = ptr;
2218                 if(pre)
2219                 {
2220                     ptr = pre ->next = ptr->next;
2221                 }else
2222                 {
2223                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2224                 }
2225                 FreeMsgList(tmp_ptr);
2226 #else
2227                 FreeMsgList(ptr);
2228 #endif
2229 #if PRINT_SYH
2230                 buffered_smsg_counter--;
2231                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2232 #endif
2233 #if CMI_EXERT_SEND_CAP
2234                 sent_cnt++;
2235                 if(sent_cnt == SEND_CAP)
2236                     break;
2237 #endif
2238             }else {
2239 #if CMK_SMP
2240 #if ONE_SEND_QUEUE
2241                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2242 #else
2243                 PCQueuePush(current_queue, (char*)ptr);
2244 #endif
2245 #else
2246                 pre = ptr;
2247                 ptr=ptr->next;
2248 #endif
2249                 done = 0;
2250                 if(status == GNI_RC_ERROR_RESOURCE)
2251                 {
2252 #if CMK_SMP && ONE_SEND_QUEUE 
2253                     destpe_avail[ptr->destNode] = 1;
2254 #else
2255                     break;
2256 #endif
2257                 }
2258             } 
2259         } //end while
2260 #if !CMK_SMP
2261         if(ptr == 0)
2262             queue->smsg_msglist_index[index].tail = pre;
2263         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2264         {
2265             if(index_previous != -1)
2266                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2267             else
2268                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2269         }
2270         else {
2271             index_previous = index;
2272         }
2273         index = queue->smsg_msglist_index[index].next;
2274 #else
2275 #if !ONE_SEND_QUEUE && SMP_LOCKS
2276         CmiLock(current_list->lock);
2277         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2278         {
2279             current_list->pushed = 1;
2280             PCQueuePush(nonEmptyQueues, current_list);
2281         }
2282         CmiUnlock(current_list->lock); 
2283 #endif
2284 #endif
2285
2286 #if CMI_EXERT_SEND_CAP
2287         if(sent_cnt == SEND_CAP)
2288                 break;
2289 #endif
2290     }   // end pooling for all cores
2291     return done;
2292 }
2293
2294 static void ProcessDeadlock();
2295 void LrtsAdvanceCommunication(int whileidle)
2296 {
2297     /*  Receive Msg first */
2298 #if CMK_SMP_TRACE_COMMTHREAD
2299     double startT, endT;
2300 #endif
2301     if (useDynamicSMSG && whileidle)
2302     {
2303 #if CMK_SMP_TRACE_COMMTHREAD
2304         startT = CmiWallTimer();
2305 #endif
2306         PumpDatagramConnection();
2307 #if CMK_SMP_TRACE_COMMTHREAD
2308         endT = CmiWallTimer();
2309         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2310 #endif
2311     }
2312
2313 #if CMK_SMP_TRACE_COMMTHREAD
2314     startT = CmiWallTimer();
2315 #endif
2316     PumpNetworkSmsg();
2317     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2318 #if CMK_SMP_TRACE_COMMTHREAD
2319     endT = CmiWallTimer();
2320     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2321 #endif
2322
2323 #if CMK_SMP_TRACE_COMMTHREAD
2324     startT = CmiWallTimer();
2325 #endif
2326     PumpLocalRdmaTransactions();
2327     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2328 #if CMK_SMP_TRACE_COMMTHREAD
2329     endT = CmiWallTimer();
2330     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2331 #endif
2332     /* Send buffered Message */
2333 #if CMK_SMP_TRACE_COMMTHREAD
2334     startT = CmiWallTimer();
2335 #endif
2336 #if CMK_USE_OOB
2337     if (SendBufferMsg(&smsg_oob_queue) == 1)
2338 #endif
2339     SendBufferMsg(&smsg_queue);
2340     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2341 #if CMK_SMP_TRACE_COMMTHREAD
2342     endT = CmiWallTimer();
2343     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2344 #endif
2345
2346 #if CMK_SMP_TRACE_COMMTHREAD
2347     startT = CmiWallTimer();
2348 #endif
2349     SendRdmaMsg();
2350     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2351 #if CMK_SMP_TRACE_COMMTHREAD
2352     endT = CmiWallTimer();
2353     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2354 #endif
2355
2356 #if CMK_SMP
2357     if (_detected_hang)  ProcessDeadlock();
2358 #endif
2359 }
2360
2361 /* useDynamicSMSG */
2362 static void _init_dynamic_smsg()
2363 {
2364     gni_return_t status;
2365     uint32_t     vmdh_index = -1;
2366     int i;
2367
2368     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2369     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2370     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2371     for(i=0; i<mysize; i++) {
2372         smsg_connected_flag[i] = 0;
2373         smsg_attr_vector_local[i] = NULL;
2374         smsg_attr_vector_remote[i] = NULL;
2375     }
2376     if(mysize <=512)
2377     {
2378         SMSG_MAX_MSG = 4096;
2379     }else if (mysize <= 4096)
2380     {
2381         SMSG_MAX_MSG = 4096/mysize * 1024;
2382     }else if (mysize <= 16384)
2383     {
2384         SMSG_MAX_MSG = 512;
2385     }else {
2386         SMSG_MAX_MSG = 256;
2387     }
2388
2389     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2390     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2391     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2392     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2393     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2394
2395     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2396     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2397     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2398     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2399     mailbox_list->offset = 0;
2400     mailbox_list->next = 0;
2401     
2402     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2403         mailbox_list->size, smsg_rx_cqh,
2404         GNI_MEM_READWRITE,   
2405         vmdh_index,
2406         &(mailbox_list->mem_hndl));
2407     GNI_RC_CHECK("MEMORY registration for smsg", status);
2408
2409     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2410     GNI_RC_CHECK("Unbound EP", status);
2411     
2412     alloc_smsg_attr(&send_smsg_attr);
2413
2414     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2415     GNI_RC_CHECK("post unbound datagram", status);
2416
2417       /* always pre-connect to proc 0 */
2418     //if (myrank != 0) connect_to(0);
2419 }
2420
2421 static void _init_static_smsg()
2422 {
2423     gni_smsg_attr_t      *smsg_attr;
2424     gni_smsg_attr_t      remote_smsg_attr;
2425     gni_smsg_attr_t      *smsg_attr_vec;
2426     gni_mem_handle_t     my_smsg_mdh_mailbox;
2427     int      ret, i;
2428     gni_return_t status;
2429     uint32_t              vmdh_index = -1;
2430     mdh_addr_t            base_infor;
2431     mdh_addr_t            *base_addr_vec;
2432     char *env;
2433
2434     if(mysize <=512)
2435     {
2436         SMSG_MAX_MSG = 1024;
2437     }else if (mysize <= 4096)
2438     {
2439         SMSG_MAX_MSG = 1024;
2440     }else if (mysize <= 16384)
2441     {
2442         SMSG_MAX_MSG = 512;
2443     }else {
2444         SMSG_MAX_MSG = 256;
2445     }
2446     
2447     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2448     if (env) SMSG_MAX_MSG = atoi(env);
2449     CmiAssert(SMSG_MAX_MSG > 0);
2450
2451     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2452     
2453     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2454     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2455     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2456     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2457     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2458     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2459     CmiAssert(ret == 0);
2460     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2461     
2462     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2463             smsg_memlen*(mysize), smsg_rx_cqh,
2464             GNI_MEM_READWRITE,   
2465             vmdh_index,
2466             &my_smsg_mdh_mailbox);
2467     register_memory_size += smsg_memlen*(mysize);
2468     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2469
2470     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2471     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2472
2473     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2474     base_infor.mdh =  my_smsg_mdh_mailbox;
2475     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2476
2477     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2478  
2479     for(i=0; i<mysize; i++)
2480     {
2481         if(i==myrank)
2482             continue;
2483         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2484         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2485         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2486         smsg_attr[i].mbox_offset = i*smsg_memlen;
2487         smsg_attr[i].buff_size = smsg_memlen;
2488         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2489         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2490     }
2491
2492     for(i=0; i<mysize; i++)
2493     {
2494         if (myrank == i) continue;
2495
2496         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2497         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2498         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2499         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2500         remote_smsg_attr.buff_size = smsg_memlen;
2501         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2502         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2503
2504         /* initialize the smsg channel */
2505         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2506         GNI_RC_CHECK("SMSG Init", status);
2507     } //end initialization
2508
2509     free(base_addr_vec);
2510     free(smsg_attr);
2511
2512     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2513     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2514
2515
2516 inline
2517 static void _init_send_queue(SMSG_QUEUE *queue)
2518 {
2519      int i;
2520 #if ONE_SEND_QUEUE
2521      queue->sendMsgBuf = PCQueueCreate();
2522      destpe_avail = (char*)malloc(mysize * sizeof(char));
2523 #else
2524      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2525 #if CMK_SMP && SMP_LOCKS
2526      nonEmptyQueues = PCQueueCreate();
2527 #endif
2528      for(i =0; i<mysize; i++)
2529      {
2530 #if CMK_SMP
2531          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2532 #if SMP_LOCKS
2533          queue->smsg_msglist_index[i].pushed = 0;
2534          queue->smsg_msglist_index[i].lock = CmiCreateLock();
2535 #endif
2536 #else
2537          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2538          queue->smsg_msglist_index[i].next = -1;
2539          queue->smsg_head_index = -1;
2540 #endif
2541         
2542      }
2543 #endif
2544 }
2545
2546 inline
2547 static void _init_smsg()
2548 {
2549     if(mysize > 1) {
2550         if (useDynamicSMSG)
2551             _init_dynamic_smsg();
2552         else
2553             _init_static_smsg();
2554     }
2555
2556     _init_send_queue(&smsg_queue);
2557 #if CMK_USE_OOB
2558     _init_send_queue(&smsg_oob_queue);
2559 #endif
2560 }
2561
2562 static void _init_static_msgq()
2563 {
2564     gni_return_t status;
2565     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2566     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2567     msgq_attrs.smsg_q_sz = 1;
2568     msgq_attrs.rcv_pool_sz = 1;
2569     msgq_attrs.num_msgq_eps = 2;
2570     msgq_attrs.nloc_insts = 8;
2571     msgq_attrs.modes = 0;
2572     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2573
2574     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2575     GNI_RC_CHECK("MSGQ Init", status);
2576
2577
2578 }
2579
2580 #if CMK_SMP && STEAL_MEMPOOL
2581 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2582 {
2583     void *pool = NULL;
2584     int i, k;
2585     // check other ranks
2586     for (k=0; k<CmiMyNodeSize()+1; k++) {
2587         i = (CmiMyRank()+k)%CmiMyNodeSize();
2588         if (i==CmiMyRank()) continue;
2589         mempool_type *mptr = CpvAccessOther(mempool, i);
2590         CmiLock(mptr->mempoolLock);
2591         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2592         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2593             CmiUnlock(mptr->mempoolLock);
2594             continue;
2595         }
2596         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2597         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2598             /* search in the free list */
2599           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2600           mempool_header *current = free_header;
2601           while (current) {
2602             if (current->next_free == (char*)header-(char*)mptr) break;
2603             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2604           }
2605           if (current == NULL) {         /* not found in free list */
2606             CmiUnlock(mptr->mempoolLock);
2607             continue;
2608           }
2609 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2610             /* search the previous memblock, and remove the tail */
2611           mempool_block *ptr = (mempool_block *)mptr;
2612           while (ptr) {
2613             if (ptr->memblock_next == mptr->memblock_tail) break;
2614             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2615           }
2616           CmiAssert(ptr!=NULL);
2617           ptr->memblock_next = 0;
2618           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2619
2620             /* remove memblock from the free list */
2621           current->next_free = header->next_free;
2622           if (header == free_header) mptr->freelist_head = header->next_free;
2623
2624           CmiUnlock(mptr->mempoolLock);
2625
2626           pool = (void*)tail;
2627           *mem_hndl = tail->mem_hndl;
2628           *size = tail->size;
2629           return pool;
2630         }
2631         CmiUnlock(mptr->mempoolLock);
2632     }
2633
2634       /* steal failed, deregister and free memblock now */
2635     int freed = 0;
2636     for (k=0; k<CmiMyNodeSize()+1; k++) {
2637         i = (CmiMyRank()+k)%CmiMyNodeSize();
2638         mempool_type *mptr = CpvAccessOther(mempool, i);
2639         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2640
2641         mempool_block *mempools_head = &(mptr->mempools_head);
2642         mempool_block *current = mempools_head;
2643         mempool_block *prev = NULL;
2644
2645         while (current) {
2646           int isfree = 0;
2647           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2648 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2649           mempool_header *cur = free_header;
2650           mempool_header *header;
2651           if (current != mempools_head) {
2652             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2653              /* search in free list */
2654             if (header->size == current->size - sizeof(mempool_block)) {
2655               cur = free_header;
2656               while (cur) {
2657                 if (cur->next_free == (char*)header-(char*)mptr) break;
2658                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2659               }
2660               if (cur != NULL) isfree = 1;
2661             }
2662           }
2663           if (isfree) {
2664               /* remove from free list */
2665             cur->next_free = header->next_free;
2666             if (header == free_header) mptr->freelist_head = header->next_free;
2667              // deregister
2668             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2669             GNI_RC_CHECK("steal Mempool de-register", status);
2670             mempool_block *ptr = current;
2671             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2672             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2673 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2674             freed += ptr->size;
2675             free(ptr);
2676              // try now
2677             if (freed > *size) {
2678               if (pool == NULL) {
2679                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2680                 CmiAssert(ret == 0);
2681               }
2682               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2683               if (status == GNI_RC_SUCCESS) {
2684                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2685 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2686                 return pool;
2687               }
2688 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2689             }
2690           }
2691           else {
2692              prev = current;
2693              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2694           }
2695         }
2696
2697         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2698     }
2699       /* still no luck registering pool */
2700     if (pool) free(pool);
2701     return NULL;
2702 }
2703 #endif
2704
2705 static long long int total_mempool_size = 0;
2706 static long long int total_mempool_calls = 0;
2707
2708 #if USE_LRTS_MEMPOOL
2709 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2710 {
2711     void *pool;
2712     int ret;
2713
2714     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
2715     if (*size < default_size) *size = default_size;
2716     total_mempool_size += *size;
2717     total_mempool_calls += 1;
2718     if (*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) 
2719     {
2720         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
2721         CmiAbort("alloc_mempool_block");
2722     }
2723     ret = posix_memalign(&pool, ALIGNBUF, *size);
2724     if (ret != 0) {
2725 #if CMK_SMP && STEAL_MEMPOOL
2726       pool = steal_mempool_block(size, mem_hndl);
2727       if (pool != NULL) return pool;
2728 #endif
2729       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2730       if (ret == ENOMEM)
2731         CmiAbort("alloc_mempool_block: out of memory.");
2732       else
2733         CmiAbort("alloc_mempool_block: posix_memalign failed");
2734     }
2735     SetMemHndlZero((*mem_hndl));
2736     
2737     return pool;
2738 }
2739
2740 // ptr is a block head pointer
2741 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2742 {
2743     if(!(IsMemHndlZero(mem_hndl)))
2744     {
2745         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
2746     }
2747     free(ptr);
2748 }
2749 #endif
2750
2751 void LrtsPreCommonInit(int everReturn){
2752 #if USE_LRTS_MEMPOOL
2753     CpvInitialize(mempool_type*, mempool);
2754     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
2755     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2756 #endif
2757 }
2758
2759 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2760 {
2761     register int            i;
2762     int                     rc;
2763     int                     device_id = 0;
2764     unsigned int            remote_addr;
2765     gni_cdm_handle_t        cdm_hndl;
2766     gni_return_t            status = GNI_RC_SUCCESS;
2767     uint32_t                vmdh_index = -1;
2768     uint8_t                 ptag;
2769     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2770     int                     first_spawned;
2771     int                     physicalID;
2772     char                   *env;
2773
2774     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2775     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2776     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2777    
2778     status = PMI_Init(&first_spawned);
2779     GNI_RC_CHECK("PMI_Init", status);
2780
2781     status = PMI_Get_size(&mysize);
2782     GNI_RC_CHECK("PMI_Getsize", status);
2783
2784     status = PMI_Get_rank(&myrank);
2785     GNI_RC_CHECK("PMI_getrank", status);
2786
2787     //physicalID = CmiPhysicalNodeID(myrank);
2788     
2789     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2790
2791     *myNodeID = myrank;
2792     *numNodes = mysize;
2793   
2794 #if MULTI_THREAD_SEND
2795     /* Currently, we only consider the case that comm. thread will only recv msgs */
2796     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2797 #endif
2798
2799     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2800     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2801     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2802
2803     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2804     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2805     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2806
2807     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2808     if (env) useDynamicSMSG = 1;
2809     if (!useDynamicSMSG)
2810       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2811     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2812     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2813     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2814     
2815     if(myrank == 0)
2816     {
2817         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2818         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2819     }
2820 #ifdef USE_ONESIDED
2821     onesided_init(NULL, &onesided_hnd);
2822
2823     // this is a GNI test, so use the libonesided bypass functionality
2824     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2825     local_addr = gniGetNicAddress();
2826 #else
2827     ptag = get_ptag();
2828     cookie = get_cookie();
2829 #if 0
2830     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2831 #endif
2832     //Create and attach to the communication  domain */
2833     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2834     GNI_RC_CHECK("GNI_CdmCreate", status);
2835     //* device id The device id is the minor number for the device
2836     //that is assigned to the device by the system when the device is created.
2837     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2838     //where X is the device number 0 default 
2839     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2840     GNI_RC_CHECK("GNI_CdmAttach", status);
2841     local_addr = get_gni_nic_address(0);
2842 #endif
2843     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2844     _MEMCHECK(MPID_UGNI_AllAddr);
2845     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2846     /* create the local completion queue */
2847     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2848     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2849     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2850     
2851     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2852     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2853     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2854
2855     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2856     GNI_RC_CHECK("Create CQ (rx)", status);
2857     
2858     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2859     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2860     
2861     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2862     //GNI_RC_CHECK("Create BTE CQ", status);
2863
2864     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2865     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2866     _MEMCHECK(ep_hndl_array);
2867 #if CMK_SMP && !COMM_THREAD_SEND
2868     tx_cq_lock  = CmiCreateLock();
2869     rx_cq_lock  = CmiCreateLock();
2870 #endif
2871     for (i=0; i<mysize; i++) {
2872         if(i == myrank) continue;
2873         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2874         GNI_RC_CHECK("GNI_EpCreate ", status);   
2875         remote_addr = MPID_UGNI_AllAddr[i];
2876         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2877         GNI_RC_CHECK("GNI_EpBind ", status);   
2878     }
2879
2880     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2881     _init_smsg();
2882     PMI_Barrier();
2883
2884 #if     USE_LRTS_MEMPOOL
2885     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
2886     if (env) {
2887         _totalmem = CmiReadSize(env);
2888         if (myrank == 0)
2889             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
2890     }
2891
2892     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2893     if (env) _mempool_size = CmiReadSize(env);
2894     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2895         _mempool_size = CmiReadSize(env);
2896
2897
2898     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2899     if (env) {
2900         MAX_REG_MEM = CmiReadSize(env);
2901         user_set_flag = 1;
2902     }
2903     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
2904         MAX_REG_MEM = CmiReadSize(env);
2905         user_set_flag = 1;
2906     }
2907
2908     env = getenv("CHARM_UGNI_SEND_MAX");
2909     if (env) {
2910         MAX_BUFF_SEND = CmiReadSize(env);
2911         user_set_flag = 1;
2912     }
2913     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
2914         MAX_BUFF_SEND = CmiReadSize(env);
2915         user_set_flag = 1;
2916     }
2917
2918     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
2919     if (env) {
2920         _mempool_size_limit = CmiReadSize(env);
2921     }
2922
2923     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2924     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2925
2926     if (myrank==0) {
2927         printf("Charm++> memory pool init size: %1.fMB, max size: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
2928         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2929         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2930             /* memblock can expand to BIG_MSG * 2 size */
2931             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2932             CmiAbort("mempool maximum size is too small. \n");
2933         }
2934 #if CMK_SMP && MULTI_THREAD_SEND
2935         printf("Charm++> worker thread sending messages\n");
2936 #elif CMK_SMP && COMM_THREAD_SEND
2937         printf("Charm++> only comm thread send/recv messages\n");
2938 #endif
2939     }
2940
2941 #endif     /* end of USE_LRTS_MEMPOOL */
2942
2943     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2944     if (env) {
2945         BIG_MSG = CmiReadSize(env);
2946         if (BIG_MSG < ONE_SEG)
2947           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2948     }
2949     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2950     if (env) {
2951         BIG_MSG_PIPELINE = atoi(env);
2952     }
2953
2954     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2955     if (env) _checkProgress = 0;
2956     if (mysize == 1) _checkProgress = 0;
2957
2958     
2959     /*
2960     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
2961     if (env) 
2962         _tlbpagesize = CmiReadSize(env);
2963     */
2964     /* real gethugepagesize() is only available when hugetlb module linked */
2965     _tlbpagesize = gethugepagesize();
2966     if (myrank == 0) {
2967         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
2968     }
2969
2970     /* init DMA buffer for medium message */
2971
2972     //_init_DMA_buffer();
2973     
2974     free(MPID_UGNI_AllAddr);
2975 #if CMK_SMP
2976     sendRdmaBuf = PCQueueCreate();
2977 #else
2978     sendRdmaBuf = 0;
2979 #endif
2980 #if MACHINE_DEBUG_LOG
2981     char ln[200];
2982     sprintf(ln,"debugLog.%d",myrank);
2983     debugLog=fopen(ln,"w");
2984 #endif
2985
2986 }
2987
2988 void* LrtsAlloc(int n_bytes, int header)
2989 {
2990     void *ptr;
2991 #if 0
2992     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2993 #endif
2994     if(n_bytes <= SMSG_MAX_MSG)
2995     {
2996         int totalsize = n_bytes+header;
2997         ptr = malloc(totalsize);
2998     }
2999     else {
3000         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3001 #if     USE_LRTS_MEMPOOL
3002         n_bytes = ALIGN64(n_bytes);
3003         if(n_bytes < BIG_MSG)
3004         {
3005             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3006             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3007         }else 
3008         {
3009             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3010             ptr = res + ALIGNBUF - header;
3011
3012         }
3013 #else
3014         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
3015         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3016         ptr = res + ALIGNBUF - header;
3017 #endif
3018     }
3019     return ptr;
3020 }
3021
3022 void  LrtsFree(void *msg)
3023 {
3024     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3025     if (size <= SMSG_MAX_MSG)
3026       free(msg);
3027     else if(size>=BIG_MSG)
3028     {
3029         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3030
3031     }else
3032     {
3033 #if     USE_LRTS_MEMPOOL
3034 #if CMK_SMP
3035         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3036 #else
3037         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3038 #endif
3039 #else
3040         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3041 #endif
3042     }
3043 }
3044
3045 void LrtsExit()
3046 {
3047     /* free memory ? */
3048 #if USE_LRTS_MEMPOOL
3049     mempool_destroy(CpvAccess(mempool));
3050 #endif
3051     PMI_Finalize();
3052     exit(0);
3053 }
3054
3055 void LrtsDrainResources()
3056 {
3057     if(mysize == 1) return;
3058     while (
3059 #if CMK_USE_OOB
3060            !SendBufferMsg(&smsg_oob_queue) ||
3061 #endif
3062            !SendBufferMsg(&smsg_queue))
3063     {
3064         if (useDynamicSMSG)
3065             PumpDatagramConnection();
3066         PumpNetworkSmsg();
3067         PumpLocalRdmaTransactions();
3068         SendRdmaMsg();
3069     }
3070     PMI_Barrier();
3071 }
3072
3073 void LrtsAbort(const char *message) {
3074     printf("CmiAbort is calling on PE:%d\n", myrank);
3075     CmiPrintStackTrace(0);
3076     PMI_Abort(-1, message);
3077 }
3078
3079 /**************************  TIMER FUNCTIONS **************************/
3080 #if CMK_TIMER_USE_SPECIAL
3081 /* MPI calls are not threadsafe, even the timer on some machines */
3082 static CmiNodeLock  timerLock = 0;
3083 static int _absoluteTime = 0;
3084 static int _is_global = 0;
3085 static struct timespec start_ts;
3086
3087 inline int CmiTimerIsSynchronized() {
3088     return 0;
3089 }
3090
3091 inline int CmiTimerAbsolute() {
3092     return _absoluteTime;
3093 }
3094
3095 double CmiStartTimer() {
3096     return 0.0;
3097 }
3098
3099 double CmiInitTime() {
3100     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3101 }
3102
3103 void CmiTimerInit(char **argv) {
3104     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3105     if (_absoluteTime && CmiMyPe() == 0)
3106         printf("Charm++> absolute  timer is used\n");
3107     
3108     _is_global = CmiTimerIsSynchronized();
3109
3110
3111     if (_is_global) {
3112         if (CmiMyRank() == 0) {
3113             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3114         }
3115     } else { /* we don't have a synchronous timer, set our own start time */
3116         CmiBarrier();
3117         CmiBarrier();
3118         CmiBarrier();
3119         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3120     }
3121     CmiNodeAllBarrier();          /* for smp */
3122 }
3123
3124 /**
3125  * Since the timerLock is never created, and is
3126  * always NULL, then all the if-condition inside
3127  * the timer functions could be disabled right
3128  * now in the case of SMP.
3129  */
3130 double CmiTimer(void) {
3131     struct timespec now_ts;
3132     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3133     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3134         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3135 }
3136
3137 double CmiWallTimer(void) {
3138     struct timespec now_ts;
3139     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3140     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3141         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3142 }
3143
3144 double CmiCpuTimer(void) {
3145     struct timespec now_ts;
3146     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3147     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3148         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3149 }
3150
3151 #endif
3152 /************Barrier Related Functions****************/
3153
3154 int CmiBarrier()
3155 {
3156     gni_return_t status;
3157
3158 #if CMK_SMP
3159     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3160     CmiNodeAllBarrier();
3161     if (CmiMyRank() == CmiMyNodeSize())
3162 #else
3163     if (CmiMyRank() == 0)
3164 #endif
3165     {
3166         /**
3167          *  The call of CmiBarrier is usually before the initialization
3168          *  of trace module of Charm++, therefore, the START_EVENT
3169          *  and END_EVENT are disabled here. -Chao Mei
3170          */
3171         /*START_EVENT();*/
3172         status = PMI_Barrier();
3173         GNI_RC_CHECK("PMI_Barrier", status);
3174         /*END_EVENT(10);*/
3175     }
3176     CmiNodeAllBarrier();
3177     return status;
3178
3179 }
3180 #if CMK_DIRECT
3181 #include "machine-cmidirect.c"
3182 #endif
3183 #if CMK_PERSISTENT_COMM
3184 #include "machine-persistent.c"
3185 #endif
3186
3187