fix about smsg queue status check
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
16
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
20
21     other environment variables:
22
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27  */
28 /*@{*/
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37
38 #include <gni_pub.h>
39 #include <pmi.h>
40
41 #include "converse.h"
42
43 #if CMK_DIRECT
44 #include "cmidirect.h"
45 #endif
46 #define PRINT_SYH  0
47
48 #define USE_LRTS_MEMPOOL                  1
49 #define REMOTE_EVENT                      0
50
51 #define CMI_EXERT_SEND_CAP      0
52 #define CMI_EXERT_RECV_CAP      0
53
54 #if CMI_EXERT_SEND_CAP
55 #define SEND_CAP 16
56 #endif
57
58 #if CMI_EXERT_RECV_CAP
59 #define RECV_CAP 2
60 #endif
61
62 #if CMK_SMP
63 #define COMM_THREAD_SEND 1
64 //#define MULTI_THREAD_SEND 1
65 #endif
66
67 // Trace communication thread
68 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
69 #define TRACE_THRESHOLD     0.00005
70 #define CMI_MPI_TRACE_MOREDETAILED 0
71 #undef CMI_MPI_TRACE_USEREVENTS
72 #define CMI_MPI_TRACE_USEREVENTS 1
73 #else
74 #undef CMK_SMP_TRACE_COMMTHREAD
75 #define CMK_SMP_TRACE_COMMTHREAD 0
76 #endif
77
78 #define CMK_TRACE_COMMOVERHEAD 0
79 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
80 #undef CMI_MPI_TRACE_USEREVENTS
81 #define CMI_MPI_TRACE_USEREVENTS 1
82 #else
83 #undef CMK_TRACE_COMMOVERHEAD
84 #define CMK_TRACE_COMMOVERHEAD 0
85 #endif
86
87 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
88 CpvStaticDeclare(double, projTraceStart);
89 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
90 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
91 #else
92 #define  START_EVENT()
93 #define  END_EVENT(x)
94 #endif
95
96 #if USE_LRTS_MEMPOOL
97
98 #define oneMB (1024ll*1024)
99 #define oneGB (1024ll*1024*1024)
100
101 static CmiInt8 _mempool_size = 8*oneMB;
102 static CmiInt8 _expand_mem =  4*oneMB;
103 static CmiInt8 _mempool_size_limit = 0;
104
105 static CmiInt8 _totalmem = 0.8*oneGB;
106
107 static int BIG_MSG  =  4*oneMB;
108 static int ONE_SEG  =  2*oneMB;
109 static int BIG_MSG_PIPELINE = 4;
110
111 // dynamic flow control
112 static CmiInt8 buffered_send_msg = 0;
113 static int   register_memory_size = 0;
114
115 #if CMK_SMP && COMM_THREAD_SEND 
116 //Dynamic flow control about memory registration
117 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
118 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
119 #else
120 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
121 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
122 #endif
123
124 #endif     /* end USE_LRTS_MEMPOOL */
125
126 #if CMK_SMP && MULTI_THREAD_SEND
127 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
128 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
129 #else
130 #define     CMI_GNI_LOCK
131 #define     CMI_GNI_UNLOCK
132 #endif
133
134 static int   user_set_flag  = 0;
135
136 static int _checkProgress = 1;             /* check deadlock */
137 static int _detected_hang = 0;
138
139 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
140
141 // dynamic SMSG
142 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
143
144 static int avg_smsg_connection = 32;
145 static int                 *smsg_connected_flag= 0;
146 static gni_smsg_attr_t     **smsg_attr_vector_local;
147 static gni_smsg_attr_t     **smsg_attr_vector_remote;
148 static gni_ep_handle_t     ep_hndl_unbound;
149 static gni_smsg_attr_t     send_smsg_attr;
150 static gni_smsg_attr_t     recv_smsg_attr;
151
152 typedef struct _dynamic_smsg_mailbox{
153    void     *mailbox_base;
154    int      size;
155    int      offset;
156    gni_mem_handle_t  mem_hndl;
157    struct      _dynamic_smsg_mailbox  *next;
158 }dynamic_smsg_mailbox_t;
159
160 static dynamic_smsg_mailbox_t  *mailbox_list;
161
162 int         rdma_id = 0;
163
164 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
165 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
166
167 #if PRINT_SYH
168 int         lrts_send_msg_id = 0;
169 int         lrts_local_done_msg = 0;
170 int         lrts_send_rdma_success = 0;
171 #endif
172
173 #include "machine.h"
174
175 #include "pcqueue.h"
176
177 #include "mempool.h"
178
179 #if CMK_PERSISTENT_COMM
180 #include "machine-persistent.h"
181 #endif
182
183 //#define  USE_ONESIDED 1
184 #ifdef USE_ONESIDED
185 //onesided implementation is wrong, since no place to restore omdh
186 #include "onesided.h"
187 onesided_hnd_t   onesided_hnd;
188 onesided_md_t    omdh;
189 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
190
191 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
192
193 #else
194 uint8_t   onesided_hnd, omdh;
195 #if REMOTE_EVENT
196 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
197          status = GNI_RC_ERROR_NOMEM;} \
198         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
199                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
200 #else
201 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
202     do {   \
203         if (register_memory_size + size >= MAX_REG_MEM) { \
204             status = GNI_RC_ERROR_NOMEM; \
205         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
206             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
207     } while(0)
208 #endif
209 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
210     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
211              register_memory_size -= size; \
212          else CmiAbort("MEM_DEregister");  \
213     } while (0)
214 #endif
215
216 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
217 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
218 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
219 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
220 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
221 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
222 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
223 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
224 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
225 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
226 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
227 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
228 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
229 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
230
231 #define   GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
232 #define   GetSizeFromBlockHeader(x)    ((block_header*)x)->size
233
234 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
235 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
236 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
237 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
238
239 #define ALIGNBUF                64
240
241 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
242 /* If SMSG is not used */
243
244 #define FMA_PER_CORE  1024
245 #define FMA_BUFFER_SIZE 1024
246
247 /* If SMSG is used */
248 static int  SMSG_MAX_MSG = 1024;
249 #define SMSG_MAX_CREDIT 72 
250
251 #define MSGQ_MAXSIZE       2048
252 /* large message transfer with FMA or BTE */
253 #define LRTS_GNI_RDMA_THRESHOLD  1024 
254
255 #if CMK_SMP
256 static int  REMOTE_QUEUE_ENTRIES=163840; 
257 static int LOCAL_QUEUE_ENTRIES=163840; 
258 #else
259 static int  REMOTE_QUEUE_ENTRIES=20480;
260 static int LOCAL_QUEUE_ENTRIES=20480; 
261 #endif
262
263 #define BIG_MSG_TAG  0x26
264 #define PUT_DONE_TAG      0x28
265 #define DIRECT_PUT_DONE_TAG      0x29
266 #define ACK_TAG           0x30
267 /* SMSG is data message */
268 #define SMALL_DATA_TAG          0x31
269 /* SMSG is a control message to initialize a BTE */
270 #define MEDIUM_HEAD_TAG         0x32
271 #define MEDIUM_DATA_TAG         0x33
272 #define LMSG_INIT_TAG           0x39 
273 #define VERY_LMSG_INIT_TAG      0x40 
274 #define VERY_LMSG_TAG           0x41 
275
276 #define DEBUG
277 #ifdef GNI_RC_CHECK
278 #undef GNI_RC_CHECK
279 #endif
280 #ifdef DEBUG
281 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
282 #else
283 #define GNI_RC_CHECK(msg,rc)
284 #endif
285
286 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
287 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
288
289 static int useStaticMSGQ = 0;
290 static int useStaticFMA = 0;
291 static int mysize, myrank;
292 gni_nic_handle_t      nic_hndl;
293
294 typedef struct {
295     gni_mem_handle_t mdh;
296     uint64_t addr;
297 } mdh_addr_t ;
298 // this is related to dynamic SMSG
299
300 typedef struct mdh_addr_list{
301     gni_mem_handle_t mdh;
302    void *addr;
303     struct mdh_addr_list *next;
304 }mdh_addr_list_t;
305
306 static unsigned int         smsg_memlen;
307 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
308 mdh_addr_t          setup_mem;
309 mdh_addr_t          *smsg_connection_vec = 0;
310 gni_mem_handle_t    smsg_connection_memhndl;
311 static int          smsg_expand_slots = 10;
312 static int          smsg_available_slot = 0;
313 static void         *smsg_mailbox_mempool = 0;
314 mdh_addr_list_t     *smsg_dynamic_list = 0;
315
316 static void             *smsg_mailbox_base;
317 gni_msgq_attr_t         msgq_attrs;
318 gni_msgq_handle_t       msgq_handle;
319 gni_msgq_ep_attr_t      msgq_ep_attrs;
320 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
321
322 /* =====Beginning of Declarations of Machine Specific Variables===== */
323 static int cookie;
324 static int modes = 0;
325 static gni_cq_handle_t       smsg_rx_cqh = NULL;
326 static gni_cq_handle_t       smsg_tx_cqh = NULL;
327 static gni_cq_handle_t       post_rx_cqh = NULL;
328 static gni_cq_handle_t       post_tx_cqh = NULL;
329 static gni_ep_handle_t       *ep_hndl_array;
330 #if CMK_SMP && MULTI_THREAD_SEND
331 static CmiNodeLock           *ep_lock_array;
332 static CmiNodeLock           tx_cq_lock; 
333 static CmiNodeLock           rx_cq_lock;
334 static CmiNodeLock           *mempool_lock;
335 #endif
336
337 typedef struct msg_list
338 {
339     uint32_t destNode;
340     uint32_t size;
341     void *msg;
342     uint8_t tag;
343 #if !CMK_SMP
344     struct msg_list *next;
345 #endif
346 }MSG_LIST;
347
348
349 typedef struct control_msg
350 {
351     uint64_t            source_addr;    /* address from the start of buffer  */
352     uint64_t            dest_addr;      /* address from the start of buffer */
353     int                 total_length;   /* total length */
354     int                 length;         /* length of this packet */
355     uint8_t             seq_id;         //big message   0 meaning single message
356     gni_mem_handle_t    source_mem_hndl;
357     struct control_msg *next;
358 } CONTROL_MSG;
359
360 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
361
362 typedef struct ack_msg
363 {
364     uint64_t            source_addr;    /* address from the start of buffer  */
365 #if ! USE_LRTS_MEMPOOL
366     gni_mem_handle_t    source_mem_hndl;
367     int                 length;          /* total length */
368 #endif
369     struct ack_msg     *next;
370 } ACK_MSG;
371
372 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
373
374 #if CMK_DIRECT
375 typedef struct{
376     uint64_t    handler_addr;
377 }CMK_DIRECT_HEADER;
378
379 typedef struct {
380     char core[CmiMsgHeaderSizeBytes];
381     uint64_t handler;
382 }cmidirectMsg;
383
384 //SYH
385 CpvDeclare(int, CmiHandleDirectIdx);
386 void CmiHandleDirectMsg(cmidirectMsg* msg)
387 {
388
389     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
390    (*(_handle->callbackFnPtr))(_handle->callbackData);
391    CmiFree(msg);
392 }
393
394 void CmiDirectInit()
395 {
396     CpvInitialize(int,  CmiHandleDirectIdx);
397     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
398 }
399
400 #endif
401 typedef struct  rmda_msg
402 {
403     int                   destNode;
404     gni_post_descriptor_t *pd;
405 #if !CMK_SMP
406     struct  rmda_msg      *next;
407 #endif
408 }RDMA_REQUEST;
409
410
411 #if CMK_SMP
412
413 #define ONE_SEND_QUEUE                  0
414 PCQueue sendRdmaBuf;
415 typedef struct  msg_list_index
416 {
417     PCQueue     sendSmsgBuf;
418 } MSG_LIST_INDEX;
419
420 #else         /* non-smp */
421
422 static RDMA_REQUEST        *sendRdmaBuf = 0;
423 static RDMA_REQUEST        *sendRdmaTail = 0;
424 typedef struct  msg_list_index
425 {
426     int         next;
427     MSG_LIST    *sendSmsgBuf;
428     MSG_LIST    *tail;
429 } MSG_LIST_INDEX;
430
431 #endif
432
433 /* reuse PendingMsg memory */
434 static CONTROL_MSG          *control_freelist=0;
435 static ACK_MSG              *ack_freelist=0;
436 static MSG_LIST             *msglist_freelist=0;
437
438 // buffered send queue
439 #if ! ONE_SEND_QUEUE
440 typedef struct smsg_queue
441 {
442     MSG_LIST_INDEX   *smsg_msglist_index;
443 #if ! CMK_SMP
444     int               smsg_head_index;
445 #endif
446 } SMSG_QUEUE;
447 #else
448 typedef struct smsg_queue
449 {
450     PCQueue       sendMsgBuf;
451 }  SMSG_QUEUE;
452 #endif
453
454 SMSG_QUEUE                  smsg_queue;
455 SMSG_QUEUE                  smsg_oob_queue;
456
457 #if CMK_SMP
458
459 #define FreeMsgList(d)   free(d);
460 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
461
462 #else
463
464 #define FreeMsgList(d)  \
465   (d)->next = msglist_freelist;\
466   msglist_freelist = d;
467
468 #define MallocMsgList(d) \
469   d = msglist_freelist;\
470   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
471              _MEMCHECK(d);\
472   } else msglist_freelist = d->next; \
473   d->next =0;
474 #endif
475
476 #if CMK_SMP
477
478 #define FreeControlMsg(d)      free(d);
479 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
480
481 #else
482
483 #define FreeControlMsg(d)       \
484   (d)->next = control_freelist;\
485   control_freelist = d;
486
487 #define MallocControlMsg(d) \
488   d = control_freelist;\
489   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
490              _MEMCHECK(d);\
491   } else control_freelist = d->next;
492
493 #endif
494
495 #if CMK_SMP
496
497 #define FreeAckMsg(d)      free(d);
498 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
499
500 #else
501
502 #define FreeAckMsg(d)       \
503   (d)->next = ack_freelist;\
504   ack_freelist = d;
505
506 #define MallocAckMsg(d) \
507   d = ack_freelist;\
508   if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
509              _MEMCHECK(d);\
510   } else ack_freelist = d->next;
511
512 #endif
513
514 static RDMA_REQUEST         *rdma_freelist = NULL;
515
516 #define FreeMediumControlMsg(d)       \
517   (d)->next = medium_control_freelist;\
518   medium_control_freelist = d;
519
520
521 #define MallocMediumControlMsg(d) \
522     d = medium_control_freelist;\
523     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
524     _MEMCHECK(d);\
525 } else mediumcontrol_freelist = d->next;
526
527 # if CMK_SMP
528 #define FreeRdmaRequest(d)       free(d);
529 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
530 #else
531
532 #define FreeRdmaRequest(d)       \
533   (d)->next = rdma_freelist;\
534   rdma_freelist = d;
535
536 #define MallocRdmaRequest(d) \
537   d = rdma_freelist;\
538   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
539              _MEMCHECK(d);\
540   } else rdma_freelist = d->next; \
541     d->next =0;
542 #endif
543
544 /* reuse gni_post_descriptor_t */
545 static gni_post_descriptor_t *post_freelist=0;
546
547 #if  !CMK_SMP
548 #define FreePostDesc(d)       \
549     (d)->next_descr = post_freelist;\
550     post_freelist = d;
551
552 #define MallocPostDesc(d) \
553   d = post_freelist;\
554   if (d==0) { \
555      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
556      d->next_descr = 0;\
557       _MEMCHECK(d);\
558   } else post_freelist = d->next_descr;
559 #else
560
561 #define FreePostDesc(d)     free(d);
562 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
563
564 #endif
565
566
567 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
568 static int      buffered_smsg_counter = 0;
569
570 /* SmsgSend return success but message sent is not confirmed by remote side */
571 static MSG_LIST *buffered_fma_head = 0;
572 static MSG_LIST *buffered_fma_tail = 0;
573
574 /* functions  */
575 #define IsFree(a,ind)  !( a& (1<<(ind) ))
576 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
577 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
578
579 CpvDeclare(mempool_type*, mempool);
580
581 /* get the upper bound of log 2 */
582 int mylog2(int size)
583 {
584     int op = size;
585     unsigned int ret=0;
586     unsigned int mask = 0;
587     int i;
588     while(op>0)
589     {
590         op = op >> 1;
591         ret++;
592
593     }
594     for(i=1; i<ret; i++)
595     {
596         mask = mask << 1;
597         mask +=1;
598     }
599
600     ret -= ((size &mask) ? 0:1);
601     return ret;
602 }
603
604 static void
605 allgather(void *in,void *out, int len)
606 {
607     static int *ivec_ptr=NULL,already_called=0,job_size=0;
608     int i,rc;
609     int my_rank;
610     char *tmp_buf,*out_ptr;
611
612     if(!already_called) {
613
614         rc = PMI_Get_size(&job_size);
615         CmiAssert(rc == PMI_SUCCESS);
616         rc = PMI_Get_rank(&my_rank);
617         CmiAssert(rc == PMI_SUCCESS);
618
619         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
620         CmiAssert(ivec_ptr != NULL);
621
622         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
623         CmiAssert(rc == PMI_SUCCESS);
624
625         already_called = 1;
626
627     }
628
629     tmp_buf = (char *)malloc(job_size * len);
630     CmiAssert(tmp_buf);
631
632     rc = PMI_Allgather(in,tmp_buf,len);
633     CmiAssert(rc == PMI_SUCCESS);
634
635     out_ptr = out;
636
637     for(i=0;i<job_size;i++) {
638
639         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
640
641     }
642
643     free(tmp_buf);
644 }
645
646 static void
647 allgather_2(void *in,void *out, int len)
648 {
649     //PMI_Allgather is out of order
650     int i,rc, extend_len;
651     int  rank_index;
652     char *out_ptr, *out_ref;
653     char *in2;
654
655     extend_len = sizeof(int) + len;
656     in2 = (char*)malloc(extend_len);
657
658     memcpy(in2, &myrank, sizeof(int));
659     memcpy(in2+sizeof(int), in, len);
660
661     out_ptr = (char*)malloc(mysize*extend_len);
662
663     rc = PMI_Allgather(in2, out_ptr, extend_len);
664     GNI_RC_CHECK("allgather", rc);
665
666     out_ref = out;
667
668     for(i=0;i<mysize;i++) {
669         //rank index 
670         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
671         //copy to the rank index slot
672         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
673     }
674
675     free(out_ptr);
676     free(in2);
677
678 }
679
680 static unsigned int get_gni_nic_address(int device_id)
681 {
682     unsigned int address, cpu_id;
683     gni_return_t status;
684     int i, alps_dev_id=-1,alps_address=-1;
685     char *token, *p_ptr;
686
687     p_ptr = getenv("PMI_GNI_DEV_ID");
688     if (!p_ptr) {
689         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
690        
691         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
692     } else {
693         while ((token = strtok(p_ptr,":")) != NULL) {
694             alps_dev_id = atoi(token);
695             if (alps_dev_id == device_id) {
696                 break;
697             }
698             p_ptr = NULL;
699         }
700         CmiAssert(alps_dev_id != -1);
701         p_ptr = getenv("PMI_GNI_LOC_ADDR");
702         CmiAssert(p_ptr != NULL);
703         i = 0;
704         while ((token = strtok(p_ptr,":")) != NULL) {
705             if (i == alps_dev_id) {
706                 alps_address = atoi(token);
707                 break;
708             }
709             p_ptr = NULL;
710             ++i;
711         }
712         CmiAssert(alps_address != -1);
713         address = alps_address;
714     }
715     return address;
716 }
717
718 static uint8_t get_ptag(void)
719 {
720     char *p_ptr, *token;
721     uint8_t ptag;
722
723     p_ptr = getenv("PMI_GNI_PTAG");
724     CmiAssert(p_ptr != NULL);
725     token = strtok(p_ptr, ":");
726     ptag = (uint8_t)atoi(token);
727     return ptag;
728         
729 }
730
731 static uint32_t get_cookie(void)
732 {
733     uint32_t cookie;
734     char *p_ptr, *token;
735
736     p_ptr = getenv("PMI_GNI_COOKIE");
737     CmiAssert(p_ptr != NULL);
738     token = strtok(p_ptr, ":");
739     cookie = (uint32_t)atoi(token);
740
741     return cookie;
742 }
743
744 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
745 /* TODO: add any that are related */
746 /* =====End of Definitions of Message-Corruption Related Macros=====*/
747
748
749 #include "machine-lrts.h"
750 #include "machine-common-core.c"
751
752 /* Network progress function is used to poll the network when for
753    messages. This flushes receive buffers on some  implementations*/
754 #if CMK_MACHINE_PROGRESS_DEFINED
755 void CmiMachineProgressImpl() {
756 }
757 #endif
758
759 static void SendRdmaMsg();
760 static void PumpNetworkSmsg();
761 static void PumpLocalRdmaTransactions();
762 static int SendBufferMsg(SMSG_QUEUE *queue);
763
764 #if MACHINE_DEBUG_LOG
765 FILE *debugLog = NULL;
766 static CmiInt8 buffered_recv_msg = 0;
767 int         lrts_smsg_success = 0;
768 int         lrts_received_msg = 0;
769 #endif
770
771 static void sweep_mempool(mempool_type *mptr)
772 {
773     block_header *current = &(mptr->block_head);
774
775     printf("[n %d] sweep_mempool slot START.\n", myrank);
776     while( current!= NULL) {
777         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
778         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
779     }
780     printf("[n %d] sweep_mempool slot END.\n", myrank);
781 }
782
783 inline
784 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
785 {
786     block_header *current = *from;
787
788     //while(register_memory_size>= MAX_REG_MEM)
789     //{
790         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
791             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
792
793         *from = current;
794         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
795         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
796         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
797     //}
798     return GNI_RC_SUCCESS;
799 }
800
801 inline 
802 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
803 {
804     gni_return_t status = GNI_RC_SUCCESS;
805     //int size = GetMempoolsize(msg);
806     //void *blockaddr = GetMempoolBlockPtr(msg);
807     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
808    
809     block_header *current = &(mptr->block_head);
810     while(register_memory_size>= MAX_REG_MEM)
811     {
812         status = deregisterMemory(mptr, &current);
813         if (status != GNI_RC_SUCCESS) break;
814     }
815     if(register_memory_size>= MAX_REG_MEM) return status;
816
817     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
818     while(1)
819     {
820         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
821         if(status == GNI_RC_SUCCESS)
822         {
823             break;
824         }
825         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
826         {
827             CmiAbort("Memory registor for mempool fails\n");
828         }
829         else
830         {
831             status = deregisterMemory(mptr, &current);
832             if (status != GNI_RC_SUCCESS) break;
833         }
834     }; 
835     return status;
836 }
837
838 inline 
839 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
840 {
841     static int rank = -1;
842     int i;
843     gni_return_t status;
844     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
845     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
846     mempool_type *mptr;
847
848     status = registerFromMempool(mptr1, msg, size, t);
849     if (status == GNI_RC_SUCCESS) return status;
850 #if CMK_SMP 
851     for (i=0; i<CmiMyNodeSize()+1; i++) {
852       rank = (rank+1)%(CmiMyNodeSize()+1);
853       mptr = CpvAccessOther(mempool, rank);
854       if (mptr == mptr1) continue;
855       status = registerFromMempool(mptr, msg, size, t);
856       if (status == GNI_RC_SUCCESS) return status;
857     }
858 #endif
859     return  GNI_RC_ERROR_RESOURCE;
860 }
861
862 inline
863 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
864 {
865     MSG_LIST        *msg_tmp;
866     MallocMsgList(msg_tmp);
867     msg_tmp->destNode = destNode;
868     msg_tmp->size   = size;
869     msg_tmp->msg    = msg;
870     msg_tmp->tag    = tag;
871
872 #if !CMK_SMP
873     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
874         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
875         queue->smsg_head_index = destNode;
876         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
877     }else
878     {
879         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
880         queue->smsg_msglist_index[destNode].tail = msg_tmp;
881     }
882 #else
883 #if ONE_SEND_QUEUE
884     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
885 #else
886     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
887 #endif
888 #endif
889 #if PRINT_SYH
890     buffered_smsg_counter++;
891 #endif
892 }
893
894 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
895 {
896     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
897 }
898
899 inline
900 static void setup_smsg_connection(int destNode)
901 {
902     mdh_addr_list_t  *new_entry = 0;
903     gni_post_descriptor_t *pd;
904     gni_smsg_attr_t      *smsg_attr;
905     gni_return_t status = GNI_RC_NOT_DONE;
906     RDMA_REQUEST        *rdma_request_msg;
907     
908     if(smsg_available_slot == smsg_expand_slots)
909     {
910         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
911         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
912         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
913
914         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
915             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
916             GNI_MEM_READWRITE,   
917             -1,
918             &(new_entry->mdh));
919         smsg_available_slot = 0; 
920         new_entry->next = smsg_dynamic_list;
921         smsg_dynamic_list = new_entry;
922     }
923     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
924     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
925     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
926     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
927     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
928     smsg_attr->buff_size = smsg_memlen;
929     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
930     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
931     smsg_local_attr_vec[destNode] = smsg_attr;
932     smsg_available_slot++;
933     MallocPostDesc(pd);
934     pd->type            = GNI_POST_FMA_PUT;
935     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
936     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
937     pd->length          = sizeof(gni_smsg_attr_t);
938     pd->local_addr      = (uint64_t) smsg_attr;
939     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
940     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
941     pd->src_cq_hndl     = 0;
942     pd->rdma_mode       = 0;
943     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
944     print_smsg_attr(smsg_attr);
945     if(status == GNI_RC_ERROR_RESOURCE )
946     {
947         MallocRdmaRequest(rdma_request_msg);
948         rdma_request_msg->destNode = destNode;
949         rdma_request_msg->pd = pd;
950         /* buffer this request */
951     }
952 #if PRINT_SYH
953     if(status != GNI_RC_SUCCESS)
954        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
955     else
956         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
957 #endif
958 }
959
960 /* useDynamicSMSG */
961 inline 
962 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
963 {
964     gni_return_t status = GNI_RC_NOT_DONE;
965
966     if(mailbox_list->offset == mailbox_list->size)
967     {
968         dynamic_smsg_mailbox_t *new_mailbox_entry;
969         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
970         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
971         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
972         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
973         new_mailbox_entry->offset = 0;
974         
975         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
976             new_mailbox_entry->size, smsg_rx_cqh,
977             GNI_MEM_READWRITE,   
978             -1,
979             &(new_mailbox_entry->mem_hndl));
980
981         GNI_RC_CHECK("register", status);
982         new_mailbox_entry->next = mailbox_list;
983         mailbox_list = new_mailbox_entry;
984     }
985     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
986     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
987     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
988     local_smsg_attr->mbox_offset = mailbox_list->offset;
989     mailbox_list->offset += smsg_memlen;
990     local_smsg_attr->buff_size = smsg_memlen;
991     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
992     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
993 }
994
995 /* useDynamicSMSG */
996 inline 
997 static int connect_to(int destNode)
998 {
999     gni_return_t status = GNI_RC_NOT_DONE;
1000     CmiAssert(smsg_connected_flag[destNode] == 0);
1001     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1002     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1003     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1004     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1005     
1006     CMI_GNI_LOCK
1007     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1008     CMI_GNI_UNLOCK
1009     if (status == GNI_RC_ERROR_RESOURCE) {
1010       /* possibly destNode is making connection at the same time */
1011       free(smsg_attr_vector_local[destNode]);
1012       smsg_attr_vector_local[destNode] = NULL;
1013       free(smsg_attr_vector_remote[destNode]);
1014       smsg_attr_vector_remote[destNode] = NULL;
1015       mailbox_list->offset -= smsg_memlen;
1016       return 0;
1017     }
1018     GNI_RC_CHECK("GNI_Post", status);
1019     smsg_connected_flag[destNode] = 1;
1020     return 1;
1021 }
1022
1023 inline 
1024 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1025 {
1026     unsigned int          remote_address;
1027     uint32_t              remote_id;
1028     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1029     gni_smsg_attr_t       *smsg_attr;
1030     gni_post_descriptor_t *pd;
1031     gni_post_state_t      post_state;
1032     char                  *real_data; 
1033
1034     if (useDynamicSMSG) {
1035         switch (smsg_connected_flag[destNode]) {
1036         case 0: 
1037             connect_to(destNode);         /* continue to case 1 */
1038         case 1:                           /* pending connection, do nothing */
1039             status = GNI_RC_NOT_DONE;
1040             if(inbuff ==0)
1041                 buffer_small_msgs(queue, msg, size, destNode, tag);
1042             return status;
1043         }
1044     }
1045 #if CMK_SMP
1046 #if ONE_SEND_QUEUE
1047     //if(PCQueueEmpty(queue->sendMsgBuf) || inbuff==1)
1048     //if( inbuff==1)
1049 #else
1050     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1051 #endif
1052     {
1053 #else
1054     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1055     {
1056 #endif
1057         CMI_GNI_LOCK
1058         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
1059         CMI_GNI_UNLOCK
1060         if(status == GNI_RC_SUCCESS)
1061         {
1062 #if CMK_SMP_TRACE_COMMTHREAD
1063             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1064             { 
1065                 START_EVENT();
1066                 if ( tag == SMALL_DATA_TAG)
1067                     real_data = (char*)msg; 
1068                 else 
1069                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1070                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1071             }
1072 #endif
1073             smsg_send_count ++;
1074             return status;
1075         }else
1076             status = GNI_RC_ERROR_RESOURCE;
1077     }
1078     if(inbuff ==0)
1079         buffer_small_msgs(queue, msg, size, destNode, tag);
1080     return status;
1081 }
1082
1083 inline 
1084 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1085 {
1086     /* construct a control message and send */
1087     CONTROL_MSG         *control_msg_tmp;
1088     MallocControlMsg(control_msg_tmp);
1089     control_msg_tmp->source_addr = (uint64_t)msg;
1090     control_msg_tmp->seq_id    = seqno;
1091     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1092 #if     USE_LRTS_MEMPOOL
1093     if(size < BIG_MSG)
1094     {
1095         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1096     }
1097     else
1098     {
1099         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1100         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1101         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1102     }
1103 #else
1104     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1105 #endif
1106     return control_msg_tmp;
1107 }
1108
1109 #define BLOCKING_SEND_CONTROL    0
1110
1111 // Large message, send control to receiver, receiver register memory and do a GET, 
1112 // return 1 - send no success
1113 inline
1114 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1115 {
1116     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1117     uint32_t            vmdh_index  = -1;
1118     int                 size;
1119     int                 offset = 0;
1120     uint64_t            source_addr;
1121     int                 register_size; 
1122     void                *msg;
1123
1124     size    =   control_msg_tmp->total_length;
1125     source_addr = control_msg_tmp->source_addr;
1126     register_size = control_msg_tmp->length;
1127
1128 #if  USE_LRTS_MEMPOOL
1129     if( control_msg_tmp->seq_id == 0 ){
1130 #if BLOCKING_SEND_CONTROL
1131         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1132             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1133                 LrtsAdvanceCommunication(0);
1134         }
1135 #endif
1136         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1137         {
1138             msg = (void*)source_addr;
1139             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1140             {
1141                 if(!inbuff)
1142                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1143                 return GNI_RC_ERROR_NOMEM;
1144             }
1145             //register the corresponding mempool
1146             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1147             if(status == GNI_RC_SUCCESS)
1148             {
1149                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1150             }
1151         }else
1152         {
1153             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1154             status = GNI_RC_SUCCESS;
1155         }
1156         if(NoMsgInSend( control_msg_tmp->source_addr))
1157             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1158         else
1159             register_size = 0;
1160     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1161     {
1162         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1163         source_addr += offset;
1164         size = control_msg_tmp->length;
1165 #if BLOCKING_SEND_CONTROL
1166         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1167             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1168                 LrtsAdvanceCommunication(0);
1169         }
1170 #endif
1171         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1172             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1173             {
1174                 if(!inbuff)
1175                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1176                 return GNI_RC_ERROR_NOMEM;
1177             }
1178             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1179             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1180         }
1181         else
1182         {
1183             status = GNI_RC_SUCCESS;
1184         }
1185         register_size = 0;  
1186     }
1187
1188     if(status == GNI_RC_SUCCESS)
1189     {
1190         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);  
1191         if(status == GNI_RC_SUCCESS)
1192         {
1193             buffered_send_msg += register_size;
1194             if(control_msg_tmp->seq_id == 0)
1195             {
1196                 IncreaseMsgInSend(source_addr);
1197             }
1198             FreeControlMsg(control_msg_tmp);
1199             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1200         }else
1201             status = GNI_RC_ERROR_RESOURCE;
1202
1203     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1204     {
1205         CmiAbort("Memory registor for large msg\n");
1206     }else 
1207     {
1208         status = GNI_RC_ERROR_NOMEM; 
1209         if(!inbuff)
1210             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1211     }
1212     return status;
1213 #else
1214     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1215     if(status == GNI_RC_SUCCESS)
1216     {
1217         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);  
1218         if(status == GNI_RC_SUCCESS)
1219         {
1220             FreeControlMsg(control_msg_tmp);
1221         }
1222     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1223     {
1224         CmiAbort("Memory registor for large msg\n");
1225     }else 
1226     {
1227         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1228     }
1229     return status;
1230 #endif
1231 }
1232
1233 inline void LrtsPrepareEnvelope(char *msg, int size)
1234 {
1235     CmiSetMsgSize(msg, size);
1236 }
1237
1238 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1239 {
1240     gni_return_t        status  =   GNI_RC_SUCCESS;
1241     uint8_t tag;
1242     CONTROL_MSG         *control_msg_tmp;
1243     int                 oob = ( mode & OUT_OF_BAND);
1244     SMSG_QUEUE          *queue;
1245
1246     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1247 #if CMK_USE_OOB
1248     queue = oob? &smsg_oob_queue : &smsg_queue;
1249 #else
1250     queue = &smsg_queue;
1251 #endif
1252
1253     LrtsPrepareEnvelope(msg, size);
1254
1255 #if PRINT_SYH
1256     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1257 #endif 
1258 #if CMK_SMP && COMM_THREAD_SEND
1259     if(size <= SMSG_MAX_MSG)
1260         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1261     else if (size < BIG_MSG) {
1262         control_msg_tmp =  construct_control_msg(size, msg, 0);
1263         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1264     }
1265     else {
1266           CmiSetMsgSeq(msg, 0);
1267           control_msg_tmp =  construct_control_msg(size, msg, 1);
1268           buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1269     }
1270 #else   //non-smp, smp(worker sending)
1271     if(size <= SMSG_MAX_MSG)
1272     {
1273         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1274             CmiFree(msg);
1275     }
1276     else if (size < BIG_MSG) {
1277         control_msg_tmp =  construct_control_msg(size, msg, 0);
1278         send_large_messages(queue, destNode, control_msg_tmp, 0);
1279     }
1280     else {
1281 #if     USE_LRTS_MEMPOOL
1282         CmiSetMsgSeq(msg, 0);
1283         control_msg_tmp =  construct_control_msg(size, msg, 1);
1284         send_large_messages(queue, destNode, control_msg_tmp, 0);
1285 #else
1286         control_msg_tmp =  construct_control_msg(size, msg, 0);
1287         send_large_messages(queue, destNode, control_msg_tmp, 0);
1288 #endif
1289     }
1290 #endif
1291     return 0;
1292 }
1293
1294 static void    PumpDatagramConnection();
1295 static void registerUserTraceEvents() {
1296 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1297     traceRegisterUserEvent("setting up connections", 10);
1298     traceRegisterUserEvent("Receiving small msgs", 20);
1299     traceRegisterUserEvent("Release local transaction", 30);
1300     traceRegisterUserEvent("Sending buffered small msgs", 40);
1301     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1302 #endif
1303 }
1304
1305 static void ProcessDeadlock()
1306 {
1307     static CmiUInt8 *ptr = NULL;
1308     static CmiUInt8  last = 0, mysum, sum;
1309     static int count = 0;
1310     gni_return_t status;
1311     int i;
1312
1313 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1314 //sweep_mempool(CpvAccess(mempool));
1315     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1316     mysum = smsg_send_count + smsg_recv_count;
1317     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1318     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1319     GNI_RC_CHECK("PMI_Allgather", status);
1320     sum = 0;
1321     for (i=0; i<mysize; i++)  sum+= ptr[i];
1322     if (last == 0 || sum == last) 
1323         count++;
1324     else
1325         count = 0;
1326     last = sum;
1327     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1328     if (count == 2) { 
1329         /* detected twice, it is a real deadlock */
1330         if (myrank == 0)  {
1331             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %d and %d).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1332             CmiAbort("Fatal> Deadlock detected.");
1333         }
1334     }
1335     _detected_hang = 0;
1336 }
1337
1338 static void CheckProgress()
1339 {
1340     if (smsg_send_count == last_smsg_send_count &&
1341         smsg_recv_count == last_smsg_recv_count ) 
1342     {
1343         _detected_hang = 1;
1344 #if !CMK_SMP
1345         if (_detected_hang) ProcessDeadlock();
1346 #endif
1347
1348     }
1349     else {
1350         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1351         last_smsg_send_count = smsg_send_count;
1352         last_smsg_recv_count = smsg_recv_count;
1353         _detected_hang = 0;
1354     }
1355 }
1356
1357 static void set_limit()
1358 {
1359     if (!user_set_flag && CmiMyRank() == 0) {
1360         int mynode = CmiPhysicalNodeID(CmiMyPe());
1361         int numpes = CmiNumPesOnPhysicalNode(mynode);
1362         int numprocesses = numpes / CmiMyNodeSize();
1363         MAX_REG_MEM  = _totalmem / numprocesses;
1364         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1365         if (CmiMyPe() == 0)
1366            printf("mem_max = %d, send_max =%d\n", MAX_REG_MEM, MAX_BUFF_SEND);
1367     }
1368 }
1369
1370 void LrtsPostCommonInit(int everReturn)
1371 {
1372 #if CMK_DIRECT
1373     CmiDirectInit();
1374 #endif
1375 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1376     CpvInitialize(double, projTraceStart);
1377     /* only PE 0 needs to care about registration (to generate sts file). */
1378     if (CmiMyPe() == 0) {
1379         registerMachineUserEventsFunction(&registerUserTraceEvents);
1380     }
1381 #endif
1382
1383 #if CMK_SMP
1384     CmiIdleState *s=CmiNotifyGetState();
1385     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1386     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1387 #else
1388     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1389     if (useDynamicSMSG)
1390     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1391 #endif
1392
1393     if (_checkProgress)
1394 #if CMK_SMP
1395     if (CmiMyRank() == 0)
1396 #endif
1397     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1398
1399     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1400 }
1401
1402 /* this is called by worker thread */
1403 void LrtsPostNonLocal(){
1404 #if CMK_SMP
1405 #if MULTI_THREAD_SEND
1406     if(mysize == 1) return;
1407     PumpLocalRdmaTransactions();
1408 #if CMK_USE_OOB
1409     if (SendBufferMsg(&smsg_oob_queue) == 1)
1410 #endif
1411     SendBufferMsg(&smsg_queue);
1412     SendRdmaMsg();
1413 #endif
1414 #endif
1415 }
1416
1417 /* useDynamicSMSG */
1418 static void    PumpDatagramConnection()
1419 {
1420     uint32_t          remote_address;
1421     uint32_t          remote_id;
1422     gni_return_t status;
1423     gni_post_state_t  post_state;
1424     uint64_t          datagram_id;
1425     int i;
1426
1427    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1428    {
1429        if (datagram_id >= mysize) {           /* bound endpoint */
1430            int pe = datagram_id - mysize;
1431            CMI_GNI_LOCK
1432            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1433            CMI_GNI_UNLOCK
1434            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1435            {
1436                CmiAssert(remote_id == pe);
1437                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1438                GNI_RC_CHECK("Dynamic SMSG Init", status);
1439 #if PRINT_SYH
1440                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1441 #endif
1442                CmiAssert(smsg_connected_flag[pe] == 1);
1443                smsg_connected_flag[pe] = 2;
1444            }
1445        }
1446        else {         /* unbound ep */
1447            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1448            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1449            {
1450                CmiAssert(remote_id<mysize);
1451                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1452                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1453                GNI_RC_CHECK("Dynamic SMSG Init", status);
1454 #if PRINT_SYH
1455                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1456 #endif
1457                smsg_connected_flag[remote_id] = 2;
1458
1459                alloc_smsg_attr(&send_smsg_attr);
1460                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1461                GNI_RC_CHECK("post unbound datagram", status);
1462            }
1463        }
1464    }
1465 }
1466
1467 /* pooling CQ to receive network message */
1468 static void PumpNetworkRdmaMsgs()
1469 {
1470     gni_cq_entry_t      event_data;
1471     gni_return_t        status;
1472
1473 }
1474
1475 inline 
1476 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1477 {
1478     RDMA_REQUEST        *rdma_request_msg;
1479     MallocRdmaRequest(rdma_request_msg);
1480     rdma_request_msg->destNode = inst_id;
1481     rdma_request_msg->pd = pd;
1482 #if CMK_SMP
1483     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1484 #else
1485     if(sendRdmaBuf == 0)
1486     {
1487         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1488     }else{
1489         sendRdmaTail->next = rdma_request_msg;
1490         sendRdmaTail =  rdma_request_msg;
1491     }
1492 #endif
1493
1494 }
1495 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1496 static void PumpNetworkSmsg()
1497 {
1498     uint64_t            inst_id;
1499     int                 ret;
1500     gni_cq_entry_t      event_data;
1501     gni_return_t        status, status2;
1502     void                *header;
1503     uint8_t             msg_tag;
1504     int                 msg_nbytes;
1505     void                *msg_data;
1506     gni_mem_handle_t    msg_mem_hndl;
1507     gni_smsg_attr_t     *smsg_attr;
1508     gni_smsg_attr_t     *remote_smsg_attr;
1509     int                 init_flag;
1510     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1511     uint64_t            source_addr;
1512     SMSG_QUEUE         *queue = &smsg_queue;
1513 #if     CMK_DIRECT
1514     cmidirectMsg        *direct_msg;
1515 #endif
1516     while(1)
1517     {
1518         CMI_GNI_LOCK
1519         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1520         CMI_GNI_UNLOCK
1521         if(status != GNI_RC_SUCCESS)
1522             break;
1523         inst_id = GNI_CQ_GET_INST_ID(event_data);
1524         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1525 #if PRINT_SYH
1526         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1527 #endif
1528         if (useDynamicSMSG) {
1529             /* subtle: smsg may come before connection is setup */
1530             while (smsg_connected_flag[inst_id] != 2) 
1531                PumpDatagramConnection();
1532         }
1533         msg_tag = GNI_SMSG_ANY_TAG;
1534         while(1) {
1535             CMI_GNI_LOCK
1536             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1537             CMI_GNI_UNLOCK
1538             if (status != GNI_RC_SUCCESS)
1539                 break;
1540 #if PRINT_SYH
1541             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1542 #endif
1543             /* copy msg out and then put into queue (small message) */
1544             switch (msg_tag) {
1545             case SMALL_DATA_TAG:
1546             {
1547                 START_EVENT();
1548                 msg_nbytes = CmiGetMsgSize(header);
1549                 msg_data    = CmiAlloc(msg_nbytes);
1550                 memcpy(msg_data, (char*)header, msg_nbytes);
1551 #if CMK_SMP_TRACE_COMMTHREAD
1552                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1553 #endif
1554                 handleOneRecvedMsg(msg_nbytes, msg_data);
1555                 break;
1556             }
1557             case LMSG_INIT_TAG:
1558             {
1559                 getLargeMsgRequest(header, inst_id);
1560                 break;
1561             }
1562             case ACK_TAG:   //msg fit into mempool
1563             {
1564                 /* Get is done, release message . Now put is not used yet*/
1565                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1566 #if ! USE_LRTS_MEMPOOL
1567                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1568 #else
1569                 DecreaseMsgInSend(msg);
1570 #endif
1571                 if(NoMsgInSend(msg))
1572                     buffered_send_msg -= GetMempoolsize(msg);
1573                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1574                 CmiFree(msg);
1575                 break;
1576             }
1577             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1578             {
1579                 header_tmp = (CONTROL_MSG *) header;
1580                 void *msg = (void*)(header_tmp->source_addr);
1581                 int cur_seq = CmiGetMsgSeq(msg);
1582                 int offset = ONE_SEG*(cur_seq+1);
1583                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1584                 buffered_send_msg -= header_tmp->length;
1585                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1586                 if (remain_size < 0) remain_size = 0;
1587                 CmiSetMsgSize(msg, remain_size);
1588                 if(remain_size <= 0) //transaction done
1589                 {
1590                     CmiFree(msg);
1591                 }else if (header_tmp->total_length > offset)
1592                 {
1593                     CmiSetMsgSeq(msg, cur_seq+1);
1594                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1595                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1596                     //send next seg
1597                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1598                          // pipelining
1599                     if (header_tmp->seq_id == 1) {
1600                       int i;
1601                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1602                         int seq = cur_seq+i+2;
1603                         CmiSetMsgSeq(msg, seq-1);
1604                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1605                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1606                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1607                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1608                       }
1609                     }
1610                 }
1611                 break;
1612             }
1613 #if CMK_PERSISTENT_COMM
1614             case PUT_DONE_TAG: //persistent message
1615                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1616                 int size = ((CONTROL_MSG *) header)->length;
1617                 CmiReference(msg);
1618                 handleOneRecvedMsg(size, msg); 
1619                 break;
1620 #endif
1621 #if CMK_DIRECT
1622             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1623                 //create a trigger message
1624                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1625                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1626                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1627                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1628                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1629                 break;
1630 #endif
1631             default: {
1632                 printf("weird tag problem\n");
1633                 CmiAbort("Unknown tag\n");
1634                      }
1635             }               // end switch
1636 #if PRINT_SYH
1637             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1638 #endif
1639             CMI_GNI_LOCK
1640             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1641             CMI_GNI_UNLOCK
1642             smsg_recv_count ++;
1643             msg_tag = GNI_SMSG_ANY_TAG;
1644         } //endwhile getNext
1645     }   //end while GetEvent
1646     if(status == GNI_RC_ERROR_RESOURCE)
1647     {
1648         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1649         GNI_RC_CHECK("Smsg_rx_cq full", status);
1650     }
1651 }
1652
1653 static void printDesc(gni_post_descriptor_t *pd)
1654 {
1655     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1656 }
1657
1658 // for BIG_MSG called on receiver side for receiving control message
1659 // LMSG_INIT_TAG
1660 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1661 {
1662 #if     USE_LRTS_MEMPOOL
1663     CONTROL_MSG         *request_msg;
1664     gni_return_t        status = GNI_RC_SUCCESS;
1665     void                *msg_data;
1666     gni_post_descriptor_t *pd;
1667     gni_mem_handle_t    msg_mem_hndl;
1668     int source, size, transaction_size, offset = 0;
1669     int     register_size = 0;
1670
1671     // initial a get to transfer data from the sender side */
1672     request_msg = (CONTROL_MSG *) header;
1673     size = request_msg->total_length;
1674     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1675     if(request_msg->seq_id < 2)   {
1676         msg_data = CmiAlloc(size);
1677         CmiSetMsgSeq(msg_data, 0);
1678         _MEMCHECK(msg_data);
1679     }
1680     else {
1681         offset = ONE_SEG*(request_msg->seq_id-1);
1682         msg_data = (char*)request_msg->dest_addr + offset;
1683     }
1684    
1685     MallocPostDesc(pd);
1686     pd->cqwrite_value = request_msg->seq_id;
1687     if( request_msg->seq_id == 0)
1688     {
1689         pd->local_mem_hndl= GetMemHndl(msg_data);
1690         transaction_size = ALIGN64(size);
1691         if(IsMemHndlZero(pd->local_mem_hndl))
1692         {   
1693             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1694             if(status == GNI_RC_SUCCESS)
1695             {
1696                 pd->local_mem_hndl = GetMemHndl(msg_data);
1697             }
1698             else
1699             {
1700                 SetMemHndlZero(pd->local_mem_hndl);
1701             }
1702         }
1703         if(NoMsgInRecv( (void*)(msg_data)))
1704             register_size = GetMempoolsize((void*)(msg_data));
1705         else
1706             register_size = 0;
1707     }
1708     else{
1709         transaction_size = ALIGN64(request_msg->length);
1710         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1711         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1712         {
1713             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1714         }
1715     }
1716     pd->first_operand = ALIGN64(size);                   //  total length
1717
1718     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1719         pd->type            = GNI_POST_FMA_GET;
1720     else
1721         pd->type            = GNI_POST_RDMA_GET;
1722 #if REMOTE_EVENT
1723     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1724 #else
1725     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1726 #endif
1727     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1728     pd->length          = transaction_size;
1729     pd->local_addr      = (uint64_t) msg_data;
1730     pd->remote_addr     = request_msg->source_addr + offset;
1731     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1732     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1733     pd->rdma_mode       = 0;
1734     pd->amo_cmd         = 0;
1735
1736     //memory registration success
1737     if(status == GNI_RC_SUCCESS)
1738     {
1739         CMI_GNI_LOCK
1740         if(pd->type == GNI_POST_RDMA_GET) 
1741             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1742         else
1743             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1744         CMI_GNI_UNLOCK
1745
1746         if(status == GNI_RC_SUCCESS )
1747         {
1748             if(pd->cqwrite_value == 0)
1749             {
1750 #if MACHINE_DEBUG_LOG
1751                 buffered_recv_msg += register_size;
1752                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1753 #endif
1754                 IncreaseMsgInRecv(msg_data);
1755             }
1756         }
1757     }else
1758     {
1759         SetMemHndlZero((pd->local_mem_hndl));
1760     }
1761     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1762     {
1763         bufferRdmaMsg(inst_id, pd); 
1764     }else {
1765          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1766         GNI_RC_CHECK("GetLargeAFter posting", status);
1767     }
1768 #else
1769     CONTROL_MSG         *request_msg;
1770     gni_return_t        status;
1771     void                *msg_data;
1772     gni_post_descriptor_t *pd;
1773     RDMA_REQUEST        *rdma_request_msg;
1774     gni_mem_handle_t    msg_mem_hndl;
1775     //int source;
1776     // initial a get to transfer data from the sender side */
1777     request_msg = (CONTROL_MSG *) header;
1778     msg_data = CmiAlloc(request_msg->length);
1779     _MEMCHECK(msg_data);
1780
1781     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1782
1783     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1784     {
1785         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1786     }
1787
1788     MallocPostDesc(pd);
1789     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1790         pd->type            = GNI_POST_FMA_GET;
1791     else
1792         pd->type            = GNI_POST_RDMA_GET;
1793 #if REMOTE_EVENT
1794     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1795 #else
1796     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1797 #endif
1798     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1799     pd->length          = ALIGN64(request_msg->length);
1800     pd->local_addr      = (uint64_t) msg_data;
1801     pd->remote_addr     = request_msg->source_addr;
1802     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1803     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1804     pd->rdma_mode       = 0;
1805     pd->amo_cmd         = 0;
1806
1807     //memory registration successful
1808     if(status == GNI_RC_SUCCESS)
1809     {
1810         pd->local_mem_hndl  = msg_mem_hndl;
1811         CMI_GNI_LOCK
1812         if(pd->type == GNI_POST_RDMA_GET) 
1813             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1814         else
1815             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1816         CMI_GNI_UNLOCK
1817     }else
1818     {
1819         SetMemHndlZero(pd->local_mem_hndl);
1820     }
1821     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1822     {
1823         MallocRdmaRequest(rdma_request_msg);
1824         rdma_request_msg->next = 0;
1825         rdma_request_msg->destNode = inst_id;
1826         rdma_request_msg->pd = pd;
1827         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1828     }else {
1829         GNI_RC_CHECK("AFter posting", status);
1830     }
1831 #endif
1832 }
1833
1834 static void PumpLocalRdmaTransactions()
1835 {
1836     gni_cq_entry_t          ev;
1837     gni_return_t            status;
1838     uint64_t                type, inst_id;
1839     gni_post_descriptor_t   *tmp_pd;
1840     MSG_LIST                *ptr;
1841     CONTROL_MSG             *ack_msg_tmp;
1842     ACK_MSG                 *ack_msg;
1843     uint8_t                 msg_tag;
1844 #if CMK_DIRECT
1845     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1846 #endif
1847     SMSG_QUEUE         *queue = &smsg_queue;
1848
1849     while(1) {
1850         CMI_GNI_LOCK 
1851         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1852         CMI_GNI_UNLOCK
1853         if(status != GNI_RC_SUCCESS) break;
1854         
1855         type = GNI_CQ_GET_TYPE(ev);
1856         if (type == GNI_CQ_EVENT_TYPE_POST)
1857         {
1858             inst_id     = GNI_CQ_GET_INST_ID(ev);
1859 #if PRINT_SYH
1860             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1861 #endif
1862             CMI_GNI_LOCK
1863             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1864             CMI_GNI_UNLOCK
1865
1866             switch (tmp_pd->type) {
1867 #if CMK_PERSISTENT_COMM || CMK_DIRECT
1868             case GNI_POST_RDMA_PUT:
1869 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
1870                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1871 #endif
1872             case GNI_POST_FMA_PUT:
1873                 if(tmp_pd->amo_cmd == 1) {
1874                     //sender ACK to receiver to trigger it is done
1875                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1876                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
1877                     msg_tag = DIRECT_PUT_DONE_TAG;
1878                 }
1879                 else {
1880                     CmiFree((void *)tmp_pd->local_addr);
1881                     MallocControlMsg(ack_msg_tmp);
1882                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1883                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1884                     msg_tag = PUT_DONE_TAG;
1885                 }
1886                 break;
1887 #endif
1888             case GNI_POST_RDMA_GET:
1889             case GNI_POST_FMA_GET:  {
1890 #if  ! USE_LRTS_MEMPOOL
1891                 MallocControlMsg(ack_msg_tmp);
1892                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1893                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1894                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1895                 msg_tag = ACK_TAG;  
1896 #else
1897                 int seq_id = tmp_pd->cqwrite_value;
1898                 if(seq_id > 0)      // BIG_MSG
1899                 {
1900                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1901                     MallocControlMsg(ack_msg_tmp);
1902                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1903                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1904                     ack_msg_tmp->seq_id = seq_id;
1905                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1906                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1907                     ack_msg_tmp->length = tmp_pd->length;
1908                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1909                     msg_tag = BIG_MSG_TAG; 
1910                 } 
1911                 else
1912                 {
1913                     MallocAckMsg(ack_msg);
1914                     ack_msg->source_addr = tmp_pd->remote_addr;
1915                     msg_tag = ACK_TAG;  
1916                     // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
1917                 }
1918 #endif
1919                 break;
1920             }
1921             default:
1922                 CmiPrintf("type=%d\n", tmp_pd->type);
1923                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1924             }      /* end of switch */
1925
1926 #if CMK_DIRECT
1927             if (tmp_pd->amo_cmd == 1) {
1928                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1929                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
1930             }
1931             else
1932 #endif
1933             if (msg_tag == ACK_TAG) {
1934                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
1935                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
1936             }
1937             else {
1938                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
1939                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
1940             }
1941 #if CMK_PERSISTENT_COMM
1942             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1943 #endif
1944             {
1945                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1946 #if PRINT_SYH
1947                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1948 #endif
1949 #if CMK_SMP_TRACE_COMMTHREAD
1950                     START_EVENT();
1951 #endif
1952                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1953                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1954 #if MACHINE_DEBUG_LOG
1955                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1956                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1957                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1958 #endif
1959 #if CMK_SMP_TRACE_COMMTHREAD
1960                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1961 #endif
1962                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1963                 }else if(msg_tag == BIG_MSG_TAG){
1964                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1965                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1966                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1967 #if CMK_SMP_TRACE_COMMTHREAD
1968                         START_EVENT();
1969 #endif
1970 #if PRINT_SYH
1971                         printf("Pipeline msg done [%d]\n", myrank);
1972 #endif
1973 #if CMK_SMP_TRACE_COMMTHREAD
1974                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1975 #endif
1976                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1977                     }
1978                 }
1979             }
1980             FreePostDesc(tmp_pd);
1981         }
1982     } //end while
1983     if(status == GNI_RC_ERROR_RESOURCE)
1984     {
1985         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
1986         GNI_RC_CHECK("Smsg_tx_cq full", status);
1987     }
1988 }
1989
1990 static void  SendRdmaMsg()
1991 {
1992     gni_return_t            status = GNI_RC_SUCCESS;
1993     gni_mem_handle_t        msg_mem_hndl;
1994     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
1995     RDMA_REQUEST            *pre = 0;
1996     int i, register_size = 0;
1997     void                    *msg;
1998 #if CMK_SMP
1999     int len = PCQueueLength(sendRdmaBuf);
2000     for (i=0; i<len; i++)
2001     {
2002         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2003         if (ptr == NULL) break;
2004 #else
2005     ptr = sendRdmaBuf;
2006     while (ptr!=0)
2007     {
2008 #endif 
2009         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2010         gni_post_descriptor_t *pd = ptr->pd;
2011         status = GNI_RC_SUCCESS;
2012         
2013         if(pd->cqwrite_value == 0)
2014         {
2015             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2016             {
2017                 msg = (void*)(pd->local_addr);
2018                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
2019                 if(status == GNI_RC_SUCCESS)
2020                 {
2021                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2022                 }
2023             }else
2024             {
2025                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2026             }
2027             if(NoMsgInRecv( (void*)(pd->local_addr)))
2028                 register_size = GetMempoolsize((void*)(pd->local_addr));
2029             else
2030                 register_size = 0;
2031         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2032         {
2033             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
2034         }
2035         if(status == GNI_RC_SUCCESS)        //mem register good
2036         {
2037             CMI_GNI_LOCK
2038             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
2039                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2040             else
2041                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
2042             CMI_GNI_UNLOCK
2043             if(status == GNI_RC_SUCCESS)    //post good
2044             {
2045 #if !CMK_SMP
2046                 tmp_ptr = ptr;
2047                 if(pre != 0) {
2048                     pre->next = ptr->next;
2049                 }
2050                 else {
2051                     sendRdmaBuf = ptr->next;
2052                 }
2053                 ptr = ptr->next;
2054                 FreeRdmaRequest(tmp_ptr);
2055 #endif
2056                 if(pd->cqwrite_value == 0)
2057                 {
2058                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
2059                 }
2060 #if MACHINE_DEBUG_LOG
2061                 buffered_recv_msg += register_size;
2062                 MACHSTATE(8, "GO request from buffered\n"); 
2063 #endif
2064             }else           // cannot post
2065             {
2066 #if CMK_SMP
2067                 PCQueuePush(sendRdmaBuf, (char*)ptr);
2068 #else
2069                 pre = ptr;
2070                 ptr = ptr->next;
2071 #endif
2072                 break;
2073             }
2074         } else          //memory registration fails
2075         {
2076 #if CMK_SMP
2077             PCQueuePush(sendRdmaBuf, (char*)ptr);
2078 #else
2079             pre = ptr;
2080             ptr = ptr->next;
2081 #endif
2082         }
2083     } //end while
2084 #if ! CMK_SMP
2085     if(ptr == 0)
2086         sendRdmaTail = pre;
2087 #endif
2088 }
2089
2090 // return 1 if all messages are sent
2091 static int SendBufferMsg(SMSG_QUEUE *queue)
2092 {
2093     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2094     CONTROL_MSG         *control_msg_tmp;
2095     gni_return_t        status;
2096     int                 done = 1;
2097     int                 register_size;
2098     void                *register_addr;
2099     int                 index_previous = -1;
2100 #if CMI_EXERT_SEND_CAP
2101     int                 sent_cnt = 0;
2102 #endif
2103
2104 #if CMK_SMP
2105     static int          index = 0;
2106     int                 idx;
2107 #if ONE_SEND_QUEUE
2108     int                 *destpe;
2109     destpe = (int*)CmiTmpAlloc(mysize * sizeof(int));
2110     memset(destpe, 0, mysize * sizeof(int));
2111     for (idx=0; idx<1; idx++)
2112     {
2113         int i, len = PCQueueLength(queue->sendMsgBuf);
2114         for (i=0; i<len; i++) 
2115         {
2116             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2117             if (ptr == 0) break;
2118             if (destpe[ptr->destNode] != 0) {       /* can't send to this pe */
2119                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2120                 continue;
2121             }
2122 #else
2123     for (idx=0; idx<mysize; idx++)
2124     {
2125         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
2126         for (i=0; i<len; i++) 
2127         {
2128             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
2129             if (ptr == 0) break;
2130 #endif
2131 #else
2132     int index = queue->smsg_head_index;
2133     while(index != -1)
2134     {
2135         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2136         pre = 0;
2137         while(ptr != 0)
2138         {
2139 #endif
2140             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2141             status = GNI_RC_ERROR_RESOURCE;
2142             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2143                 /* connection not exists yet */
2144             }
2145             else
2146             switch(ptr->tag)
2147             {
2148             case SMALL_DATA_TAG:
2149                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2150                 if(status == GNI_RC_SUCCESS)
2151                 {
2152                     CmiFree(ptr->msg);
2153                 }
2154                 break;
2155             case LMSG_INIT_TAG:
2156                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2157                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2158                 break;
2159             case ACK_TAG:
2160                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(ACK_MSG), ptr->tag, 1);  
2161                 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2162                 break;
2163             case BIG_MSG_TAG:
2164                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, CONTROL_MSG_SIZE, ptr->tag, 1);  
2165                 if(status == GNI_RC_SUCCESS)
2166                 {
2167                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2168                 }
2169                 break;
2170 #if CMK_DIRECT
2171             case DIRECT_PUT_DONE_TAG:
2172                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2173                 if(status == GNI_RC_SUCCESS)
2174                 {
2175                     free((CMK_DIRECT_HEADER*)ptr->msg);
2176                 }
2177                 break;
2178
2179 #endif
2180             default:
2181                 printf("Weird tag\n");
2182                 CmiAbort("should not happen\n");
2183             }       // end switch
2184             if(status == GNI_RC_SUCCESS)
2185             {
2186 #if !CMK_SMP
2187                 tmp_ptr = ptr;
2188                 if(pre)
2189                 {
2190                     ptr = pre ->next = ptr->next;
2191                 }else
2192                 {
2193                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2194                 }
2195                 FreeMsgList(tmp_ptr);
2196 #else
2197                 FreeMsgList(ptr);
2198 #endif
2199 #if PRINT_SYH
2200                 buffered_smsg_counter--;
2201                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2202 #endif
2203 #if CMI_EXERT_SEND_CAP
2204                 sent_cnt++;
2205                 if(sent_cnt == SEND_CAP)
2206                     break;
2207 #endif
2208             }else {
2209 #if CMK_SMP
2210 #if ONE_SEND_QUEUE
2211                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2212 #else
2213                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2214 #endif
2215 #else
2216                 pre = ptr;
2217                 ptr=ptr->next;
2218 #endif
2219                 done = 0;
2220                 if(status == GNI_RC_ERROR_RESOURCE)
2221                 {
2222 #if CMK_SMP && ONE_SEND_QUEUE 
2223                     destpe[ptr->destNode] = 1;
2224 #else
2225                     break;
2226 #endif
2227                 }
2228             } 
2229         } //end while
2230 #if !CMK_SMP
2231         if(ptr == 0)
2232             queue->smsg_msglist_index[index].tail = pre;
2233         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2234         {
2235             if(index_previous != -1)
2236                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2237             else
2238                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2239         }
2240         else {
2241             index_previous = index;
2242         }
2243         index = queue->smsg_msglist_index[index].next;
2244 #else
2245         index = (index+1)%mysize;
2246 #endif
2247
2248 #if CMI_EXERT_SEND_CAP
2249         if(sent_cnt == SEND_CAP)
2250                 break;
2251 #endif
2252     }   // end pooling for all cores
2253 #if ONE_SEND_QUEUE
2254     CmiTmpFree(destpe);
2255 #endif
2256     return done;
2257 }
2258
2259 static void ProcessDeadlock();
2260 void LrtsAdvanceCommunication(int whileidle)
2261 {
2262     /*  Receive Msg first */
2263 #if CMK_SMP_TRACE_COMMTHREAD
2264     double startT, endT;
2265 #endif
2266     if (useDynamicSMSG && whileidle)
2267     {
2268 #if CMK_SMP_TRACE_COMMTHREAD
2269         startT = CmiWallTimer();
2270 #endif
2271         PumpDatagramConnection();
2272 #if CMK_SMP_TRACE_COMMTHREAD
2273         endT = CmiWallTimer();
2274         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2275 #endif
2276     }
2277
2278 #if CMK_SMP_TRACE_COMMTHREAD
2279     startT = CmiWallTimer();
2280 #endif
2281     PumpNetworkSmsg();
2282     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2283 #if CMK_SMP_TRACE_COMMTHREAD
2284     endT = CmiWallTimer();
2285     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2286 #endif
2287
2288 #if CMK_SMP_TRACE_COMMTHREAD
2289     startT = CmiWallTimer();
2290 #endif
2291     PumpLocalRdmaTransactions();
2292     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2293 #if CMK_SMP_TRACE_COMMTHREAD
2294     endT = CmiWallTimer();
2295     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2296 #endif
2297     /* Send buffered Message */
2298 #if CMK_SMP_TRACE_COMMTHREAD
2299     startT = CmiWallTimer();
2300 #endif
2301 #if CMK_USE_OOB
2302     if (SendBufferMsg(&smsg_oob_queue) == 1)
2303 #endif
2304     SendBufferMsg(&smsg_queue);
2305     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2306 #if CMK_SMP_TRACE_COMMTHREAD
2307     endT = CmiWallTimer();
2308     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2309 #endif
2310
2311 #if CMK_SMP_TRACE_COMMTHREAD
2312     startT = CmiWallTimer();
2313 #endif
2314     SendRdmaMsg();
2315     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2316 #if CMK_SMP_TRACE_COMMTHREAD
2317     endT = CmiWallTimer();
2318     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2319 #endif
2320
2321 #if CMK_SMP
2322     if (_detected_hang)  ProcessDeadlock();
2323 #endif
2324 }
2325
2326 /* useDynamicSMSG */
2327 static void _init_dynamic_smsg()
2328 {
2329     gni_return_t status;
2330     uint32_t     vmdh_index = -1;
2331     int i;
2332
2333     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2334     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2335     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2336     for(i=0; i<mysize; i++) {
2337         smsg_connected_flag[i] = 0;
2338         smsg_attr_vector_local[i] = NULL;
2339         smsg_attr_vector_remote[i] = NULL;
2340     }
2341     if(mysize <=512)
2342     {
2343         SMSG_MAX_MSG = 4096;
2344     }else if (mysize <= 4096)
2345     {
2346         SMSG_MAX_MSG = 4096/mysize * 1024;
2347     }else if (mysize <= 16384)
2348     {
2349         SMSG_MAX_MSG = 512;
2350     }else {
2351         SMSG_MAX_MSG = 256;
2352     }
2353
2354     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2355     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2356     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2357     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2358     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2359
2360     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2361     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2362     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2363     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2364     mailbox_list->offset = 0;
2365     mailbox_list->next = 0;
2366     
2367     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2368         mailbox_list->size, smsg_rx_cqh,
2369         GNI_MEM_READWRITE,   
2370         vmdh_index,
2371         &(mailbox_list->mem_hndl));
2372     GNI_RC_CHECK("MEMORY registration for smsg", status);
2373
2374     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2375     GNI_RC_CHECK("Unbound EP", status);
2376     
2377     alloc_smsg_attr(&send_smsg_attr);
2378
2379     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2380     GNI_RC_CHECK("post unbound datagram", status);
2381
2382       /* always pre-connect to proc 0 */
2383     //if (myrank != 0) connect_to(0);
2384 }
2385
2386 static void _init_static_smsg()
2387 {
2388     gni_smsg_attr_t      *smsg_attr;
2389     gni_smsg_attr_t      remote_smsg_attr;
2390     gni_smsg_attr_t      *smsg_attr_vec;
2391     gni_mem_handle_t     my_smsg_mdh_mailbox;
2392     int      ret, i;
2393     gni_return_t status;
2394     uint32_t              vmdh_index = -1;
2395     mdh_addr_t            base_infor;
2396     mdh_addr_t            *base_addr_vec;
2397     char *env;
2398
2399     if(mysize <=512)
2400     {
2401         SMSG_MAX_MSG = 1024;
2402     }else if (mysize <= 4096)
2403     {
2404         SMSG_MAX_MSG = 1024;
2405     }else if (mysize <= 16384)
2406     {
2407         SMSG_MAX_MSG = 512;
2408     }else {
2409         SMSG_MAX_MSG = 256;
2410     }
2411     
2412     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2413     if (env) SMSG_MAX_MSG = atoi(env);
2414     CmiAssert(SMSG_MAX_MSG > 0);
2415
2416     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2417     
2418     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2419     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2420     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2421     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2422     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2423     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2424     CmiAssert(ret == 0);
2425     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2426     
2427     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2428             smsg_memlen*(mysize), smsg_rx_cqh,
2429             GNI_MEM_READWRITE,   
2430             vmdh_index,
2431             &my_smsg_mdh_mailbox);
2432     register_memory_size += smsg_memlen*(mysize);
2433     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2434
2435     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2436     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2437
2438     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2439     base_infor.mdh =  my_smsg_mdh_mailbox;
2440     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2441
2442     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2443  
2444     for(i=0; i<mysize; i++)
2445     {
2446         if(i==myrank)
2447             continue;
2448         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2449         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2450         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2451         smsg_attr[i].mbox_offset = i*smsg_memlen;
2452         smsg_attr[i].buff_size = smsg_memlen;
2453         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2454         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2455     }
2456
2457     for(i=0; i<mysize; i++)
2458     {
2459         if (myrank == i) continue;
2460
2461         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2462         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2463         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2464         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2465         remote_smsg_attr.buff_size = smsg_memlen;
2466         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2467         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2468
2469         /* initialize the smsg channel */
2470         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2471         GNI_RC_CHECK("SMSG Init", status);
2472     } //end initialization
2473
2474     free(base_addr_vec);
2475     free(smsg_attr);
2476
2477     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2478     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2479
2480
2481 inline
2482 static void _init_send_queue(SMSG_QUEUE *queue)
2483 {
2484      int i;
2485 #if ONE_SEND_QUEUE
2486      queue->sendMsgBuf = PCQueueCreate();
2487 #else
2488      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2489      for(i =0; i<mysize; i++)
2490      {
2491 #if CMK_SMP
2492         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2493 #else
2494         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2495         queue->smsg_msglist_index[i].next = -1;
2496 #endif
2497         
2498      }
2499 #if ! CMK_SMP
2500      queue->smsg_head_index = -1;
2501 #endif
2502 #endif
2503 }
2504
2505 inline
2506 static void _init_smsg()
2507 {
2508     if(mysize > 1) {
2509         if (useDynamicSMSG)
2510             _init_dynamic_smsg();
2511         else
2512             _init_static_smsg();
2513     }
2514
2515     _init_send_queue(&smsg_queue);
2516 #if CMK_USE_OOB
2517     _init_send_queue(&smsg_oob_queue);
2518 #endif
2519 }
2520
2521 static void _init_static_msgq()
2522 {
2523     gni_return_t status;
2524     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2525     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2526     msgq_attrs.smsg_q_sz = 1;
2527     msgq_attrs.rcv_pool_sz = 1;
2528     msgq_attrs.num_msgq_eps = 2;
2529     msgq_attrs.nloc_insts = 8;
2530     msgq_attrs.modes = 0;
2531     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2532
2533     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2534     GNI_RC_CHECK("MSGQ Init", status);
2535
2536
2537 }
2538
2539 #if CMK_SMP && STEAL_MEMPOOL
2540 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2541 {
2542     void *pool = NULL;
2543     int i, k;
2544     // check other ranks
2545     for (k=0; k<CmiMyNodeSize()+1; k++) {
2546         i = (CmiMyRank()+k)%CmiMyNodeSize();
2547         if (i==CmiMyRank()) continue;
2548         mempool_type *mptr = CpvAccessOther(mempool, i);
2549         CmiLock(mptr->mempoolLock);
2550         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2551         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2552             CmiUnlock(mptr->mempoolLock);
2553             continue;
2554         }
2555         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2556         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2557             /* search in the free list */
2558           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2559           mempool_header *current = free_header;
2560           while (current) {
2561             if (current->next_free == (char*)header-(char*)mptr) break;
2562             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2563           }
2564           if (current == NULL) {         /* not found in free list */
2565             CmiUnlock(mptr->mempoolLock);
2566             continue;
2567           }
2568 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2569             /* search the previous memblock, and remove the tail */
2570           mempool_block *ptr = (mempool_block *)mptr;
2571           while (ptr) {
2572             if (ptr->memblock_next == mptr->memblock_tail) break;
2573             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2574           }
2575           CmiAssert(ptr!=NULL);
2576           ptr->memblock_next = 0;
2577           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2578
2579             /* remove memblock from the free list */
2580           current->next_free = header->next_free;
2581           if (header == free_header) mptr->freelist_head = header->next_free;
2582
2583           CmiUnlock(mptr->mempoolLock);
2584
2585           pool = (void*)tail;
2586           *mem_hndl = tail->mem_hndl;
2587           *size = tail->size;
2588           return pool;
2589         }
2590         CmiUnlock(mptr->mempoolLock);
2591     }
2592
2593       /* steal failed, deregister and free memblock now */
2594     int freed = 0;
2595     for (k=0; k<CmiMyNodeSize()+1; k++) {
2596         i = (CmiMyRank()+k)%CmiMyNodeSize();
2597         mempool_type *mptr = CpvAccessOther(mempool, i);
2598         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2599
2600         mempool_block *mempools_head = &(mptr->mempools_head);
2601         mempool_block *current = mempools_head;
2602         mempool_block *prev = NULL;
2603
2604         while (current) {
2605           int isfree = 0;
2606           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2607 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2608           mempool_header *cur = free_header;
2609           mempool_header *header;
2610           if (current != mempools_head) {
2611             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2612              /* search in free list */
2613             if (header->size == current->size - sizeof(mempool_block)) {
2614               cur = free_header;
2615               while (cur) {
2616                 if (cur->next_free == (char*)header-(char*)mptr) break;
2617                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2618               }
2619               if (cur != NULL) isfree = 1;
2620             }
2621           }
2622           if (isfree) {
2623               /* remove from free list */
2624             cur->next_free = header->next_free;
2625             if (header == free_header) mptr->freelist_head = header->next_free;
2626              // deregister
2627             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2628             GNI_RC_CHECK("steal Mempool de-register", status);
2629             mempool_block *ptr = current;
2630             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2631             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2632 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2633             freed += ptr->size;
2634             free(ptr);
2635              // try now
2636             if (freed > *size) {
2637               if (pool == NULL) {
2638                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2639                 CmiAssert(ret == 0);
2640               }
2641               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2642               if (status == GNI_RC_SUCCESS) {
2643                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2644 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2645                 return pool;
2646               }
2647 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2648             }
2649           }
2650           else {
2651              prev = current;
2652              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2653           }
2654         }
2655
2656         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2657     }
2658       /* still no luck registering pool */
2659     if (pool) free(pool);
2660     return NULL;
2661 }
2662 #endif
2663
2664 static long long int total_mempool_size = 0;
2665 static long long int total_mempool_calls = 0;
2666
2667 #if USE_LRTS_MEMPOOL
2668 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2669 {
2670     void *pool;
2671     int ret;
2672
2673     int default_size =  expand_flag? _expand_mem : _mempool_size;
2674     if (*size < default_size) *size = default_size;
2675     total_mempool_size += *size;
2676     total_mempool_calls += 1;
2677     if (*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) {
2678         printf("Error: A mempool block with size %d is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX.", *size);
2679         CmiAbort("alloc_mempool_block");
2680     }
2681     ret = posix_memalign(&pool, ALIGNBUF, *size);
2682     if (ret != 0) {
2683 #if CMK_SMP && STEAL_MEMPOOL
2684       pool = steal_mempool_block(size, mem_hndl);
2685       if (pool != NULL) return pool;
2686 #endif
2687       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2688       if (ret == ENOMEM)
2689         CmiAbort("alloc_mempool_block: out of memory.");
2690       else
2691         CmiAbort("alloc_mempool_block: posix_memalign failed");
2692     }
2693     SetMemHndlZero((*mem_hndl));
2694     
2695     return pool;
2696 }
2697
2698 // ptr is a block head pointer
2699 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2700 {
2701     if(!(IsMemHndlZero(mem_hndl)))
2702     {
2703         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
2704     }
2705     free(ptr);
2706 }
2707 #endif
2708
2709 void LrtsPreCommonInit(int everReturn){
2710 #if USE_LRTS_MEMPOOL
2711     CpvInitialize(mempool_type*, mempool);
2712     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
2713     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2714 #endif
2715 }
2716
2717
2718 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2719 {
2720     register int            i;
2721     int                     rc;
2722     int                     device_id = 0;
2723     unsigned int            remote_addr;
2724     gni_cdm_handle_t        cdm_hndl;
2725     gni_return_t            status = GNI_RC_SUCCESS;
2726     uint32_t                vmdh_index = -1;
2727     uint8_t                 ptag;
2728     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2729     int                     first_spawned;
2730     int                     physicalID;
2731     char                   *env;
2732
2733     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2734     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2735     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2736    
2737     status = PMI_Init(&first_spawned);
2738     GNI_RC_CHECK("PMI_Init", status);
2739
2740     status = PMI_Get_size(&mysize);
2741     GNI_RC_CHECK("PMI_Getsize", status);
2742
2743     status = PMI_Get_rank(&myrank);
2744     GNI_RC_CHECK("PMI_getrank", status);
2745
2746     //physicalID = CmiPhysicalNodeID(myrank);
2747     
2748     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2749
2750     *myNodeID = myrank;
2751     *numNodes = mysize;
2752   
2753 #if MULTI_THREAD_SEND
2754     /* Currently, we only consider the case that comm. thread will only recv msgs */
2755     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2756 #endif
2757
2758     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2759     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2760     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2761
2762     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2763     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2764     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2765
2766     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2767     if (env) useDynamicSMSG = 1;
2768     if (!useDynamicSMSG)
2769       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2770     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2771     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2772     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2773     
2774     if(myrank == 0)
2775     {
2776         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2777         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2778     }
2779 #ifdef USE_ONESIDED
2780     onesided_init(NULL, &onesided_hnd);
2781
2782     // this is a GNI test, so use the libonesided bypass functionality
2783     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2784     local_addr = gniGetNicAddress();
2785 #else
2786     ptag = get_ptag();
2787     cookie = get_cookie();
2788 #if 0
2789     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2790 #endif
2791     //Create and attach to the communication  domain */
2792     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2793     GNI_RC_CHECK("GNI_CdmCreate", status);
2794     //* device id The device id is the minor number for the device
2795     //that is assigned to the device by the system when the device is created.
2796     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2797     //where X is the device number 0 default 
2798     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2799     GNI_RC_CHECK("GNI_CdmAttach", status);
2800     local_addr = get_gni_nic_address(0);
2801 #endif
2802     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2803     _MEMCHECK(MPID_UGNI_AllAddr);
2804     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2805     /* create the local completion queue */
2806     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2807     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2808     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2809     
2810     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2811     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2812     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2813
2814     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2815     GNI_RC_CHECK("Create CQ (rx)", status);
2816     
2817     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2818     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2819     
2820     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2821     //GNI_RC_CHECK("Create BTE CQ", status);
2822
2823     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2824     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2825     _MEMCHECK(ep_hndl_array);
2826 #if CMK_SMP && !COMM_THREAD_SEND
2827     tx_cq_lock  = CmiCreateLock();
2828     rx_cq_lock  = CmiCreateLock();
2829 #endif
2830     for (i=0; i<mysize; i++) {
2831         if(i == myrank) continue;
2832         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2833         GNI_RC_CHECK("GNI_EpCreate ", status);   
2834         remote_addr = MPID_UGNI_AllAddr[i];
2835         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2836         GNI_RC_CHECK("GNI_EpBind ", status);   
2837     }
2838
2839     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2840     _init_smsg();
2841     PMI_Barrier();
2842
2843 #if     USE_LRTS_MEMPOOL
2844     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
2845     if (env) {
2846         _totalmem = CmiReadSize(env);
2847         if (myrank == 0)
2848             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
2849     }
2850
2851     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2852     if (env) _mempool_size = CmiReadSize(env);
2853     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2854         _mempool_size = CmiReadSize(env);
2855
2856
2857     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2858     if (env) {
2859         MAX_REG_MEM = CmiReadSize(env);
2860         user_set_flag = 1;
2861     }
2862     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
2863         MAX_REG_MEM = CmiReadSize(env);
2864         user_set_flag = 1;
2865     }
2866
2867     env = getenv("CHARM_UGNI_SEND_MAX");
2868     if (env) {
2869         MAX_BUFF_SEND = CmiReadSize(env);
2870         user_set_flag = 1;
2871     }
2872     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
2873         MAX_BUFF_SEND = CmiReadSize(env);
2874         user_set_flag = 1;
2875     }
2876
2877     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
2878     if (env) {
2879         _mempool_size_limit = CmiReadSize(env);
2880     }
2881
2882     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2883     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2884
2885     if (myrank==0) {
2886         printf("Charm++> memory pool init size: %1.fMB, max size: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
2887         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2888         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2889             /* memblock can expand to BIG_MSG * 2 size */
2890             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2891             CmiAbort("mempool maximum size is too small. \n");
2892         }
2893 #if CMK_SMP && MULTI_THREAD_SEND
2894         printf("Charm++> worker thread sending messages\n");
2895 #elif CMK_SMP && COMM_THREAD_SEND
2896         printf("Charm++> only comm thread send/recv messages\n");
2897 #endif
2898     }
2899
2900 #endif     /* end of USE_LRTS_MEMPOOL */
2901
2902     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2903     if (env) {
2904         BIG_MSG = CmiReadSize(env);
2905         if (BIG_MSG < ONE_SEG)
2906           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2907     }
2908     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2909     if (env) {
2910         BIG_MSG_PIPELINE = atoi(env);
2911     }
2912
2913     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2914     if (env) _checkProgress = 0;
2915     if (mysize == 1) _checkProgress = 0;
2916
2917     /* init DMA buffer for medium message */
2918
2919     //_init_DMA_buffer();
2920     
2921     free(MPID_UGNI_AllAddr);
2922 #if CMK_SMP
2923     sendRdmaBuf = PCQueueCreate();
2924 #else
2925     sendRdmaBuf = 0;
2926 #endif
2927 #if MACHINE_DEBUG_LOG
2928     char ln[200];
2929     sprintf(ln,"debugLog.%d",myrank);
2930     debugLog=fopen(ln,"w");
2931 #endif
2932
2933 }
2934
2935 void* LrtsAlloc(int n_bytes, int header)
2936 {
2937     void *ptr;
2938 #if 0
2939     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2940 #endif
2941     if(n_bytes <= SMSG_MAX_MSG)
2942     {
2943         int totalsize = n_bytes+header;
2944         ptr = malloc(totalsize);
2945     }
2946     else {
2947         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2948 #if     USE_LRTS_MEMPOOL
2949         n_bytes = ALIGN64(n_bytes);
2950         if(n_bytes < BIG_MSG)
2951         {
2952             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2953             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2954         }else 
2955         {
2956             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2957             ptr = res + ALIGNBUF - header;
2958
2959         }
2960 #else
2961         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2962         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2963         ptr = res + ALIGNBUF - header;
2964 #endif
2965     }
2966     return ptr;
2967 }
2968
2969 void  LrtsFree(void *msg)
2970 {
2971     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2972     if (size <= SMSG_MAX_MSG)
2973       free(msg);
2974     else if(size>=BIG_MSG)
2975     {
2976         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2977
2978     }else
2979     {
2980 #if     USE_LRTS_MEMPOOL
2981 #if CMK_SMP
2982         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2983 #else
2984         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2985 #endif
2986 #else
2987         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2988 #endif
2989     }
2990 }
2991
2992 void LrtsExit()
2993 {
2994     /* free memory ? */
2995 #if USE_LRTS_MEMPOOL
2996     mempool_destroy(CpvAccess(mempool));
2997 #endif
2998     PMI_Finalize();
2999     exit(0);
3000 }
3001
3002 void LrtsDrainResources()
3003 {
3004     if(mysize == 1) return;
3005     while (
3006 #if CMK_USE_OOB
3007            !SendBufferMsg(&smsg_oob_queue) ||
3008 #endif
3009            !SendBufferMsg(&smsg_queue))
3010     {
3011         if (useDynamicSMSG)
3012             PumpDatagramConnection();
3013         PumpNetworkSmsg();
3014         PumpLocalRdmaTransactions();
3015         SendRdmaMsg();
3016     }
3017     PMI_Barrier();
3018 }
3019
3020 void LrtsAbort(const char *message) {
3021     printf("CmiAbort is calling on PE:%d\n", myrank);
3022     CmiPrintStackTrace(0);
3023     PMI_Abort(-1, message);
3024 }
3025
3026 /**************************  TIMER FUNCTIONS **************************/
3027 #if CMK_TIMER_USE_SPECIAL
3028 /* MPI calls are not threadsafe, even the timer on some machines */
3029 static CmiNodeLock  timerLock = 0;
3030 static int _absoluteTime = 0;
3031 static int _is_global = 0;
3032 static struct timespec start_ts;
3033
3034 inline int CmiTimerIsSynchronized() {
3035     return 0;
3036 }
3037
3038 inline int CmiTimerAbsolute() {
3039     return _absoluteTime;
3040 }
3041
3042 double CmiStartTimer() {
3043     return 0.0;
3044 }
3045
3046 double CmiInitTime() {
3047     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3048 }
3049
3050 void CmiTimerInit(char **argv) {
3051     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3052     if (_absoluteTime && CmiMyPe() == 0)
3053         printf("Charm++> absolute  timer is used\n");
3054     
3055     _is_global = CmiTimerIsSynchronized();
3056
3057
3058     if (_is_global) {
3059         if (CmiMyRank() == 0) {
3060             clock_gettime(CLOCK_MONOTONIC, &start_ts);
3061         }
3062     } else { /* we don't have a synchronous timer, set our own start time */
3063         CmiBarrier();
3064         CmiBarrier();
3065         CmiBarrier();
3066         clock_gettime(CLOCK_MONOTONIC, &start_ts);
3067     }
3068     CmiNodeAllBarrier();          /* for smp */
3069 }
3070
3071 /**
3072  * Since the timerLock is never created, and is
3073  * always NULL, then all the if-condition inside
3074  * the timer functions could be disabled right
3075  * now in the case of SMP.
3076  */
3077 double CmiTimer(void) {
3078     struct timespec now_ts;
3079     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3080     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3081         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3082 }
3083
3084 double CmiWallTimer(void) {
3085     struct timespec now_ts;
3086     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3087     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3088         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
3089 }
3090
3091 double CmiCpuTimer(void) {
3092     struct timespec now_ts;
3093     clock_gettime(CLOCK_MONOTONIC, &now_ts);
3094     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3095         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
3096 }
3097
3098 #endif
3099 /************Barrier Related Functions****************/
3100
3101 int CmiBarrier()
3102 {
3103     gni_return_t status;
3104
3105 #if CMK_SMP
3106     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
3107     CmiNodeAllBarrier();
3108     if (CmiMyRank() == CmiMyNodeSize())
3109 #else
3110     if (CmiMyRank() == 0)
3111 #endif
3112     {
3113         /**
3114          *  The call of CmiBarrier is usually before the initialization
3115          *  of trace module of Charm++, therefore, the START_EVENT
3116          *  and END_EVENT are disabled here. -Chao Mei
3117          */
3118         /*START_EVENT();*/
3119         status = PMI_Barrier();
3120         GNI_RC_CHECK("PMI_Barrier", status);
3121         /*END_EVENT(10);*/
3122     }
3123     CmiNodeAllBarrier();
3124     return status;
3125
3126 }
3127 #if CMK_DIRECT
3128 #include "machine-cmidirect.c"
3129 #endif
3130 #if CMK_PERSISTENT_COMM
3131 #include "machine-persistent.c"
3132 #endif
3133
3134