automatically set MEM_MAX MAX_SEND
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes     # disable checking deadlock
21     export CHARM_UGNI_BIG_MSG_SIZE=4M           # set big message size protocol
22     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4    # set big message pipe len
23  */
24 /*@{*/
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <stdint.h>
29 #include <errno.h>
30 #include <malloc.h>
31 #include <unistd.h>
32
33 #include <gni_pub.h>
34 #include <pmi.h>
35 #include <time.h>
36
37 #include "converse.h"
38
39 #define PRINT_SYH  0
40
41 // Trace communication thread
42 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
43 #define TRACE_THRESHOLD     0.00005
44 #define CMI_MPI_TRACE_MOREDETAILED 0
45 #undef CMI_MPI_TRACE_USEREVENTS
46 #define CMI_MPI_TRACE_USEREVENTS 1
47 #else
48 #undef CMK_SMP_TRACE_COMMTHREAD
49 #define CMK_SMP_TRACE_COMMTHREAD 0
50 #endif
51
52 #define CMK_TRACE_COMMOVERHEAD 0
53 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
54 #undef CMI_MPI_TRACE_USEREVENTS
55 #define CMI_MPI_TRACE_USEREVENTS 1
56 #else
57 #undef CMK_TRACE_COMMOVERHEAD
58 #define CMK_TRACE_COMMOVERHEAD 0
59 #endif
60
61 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
62 CpvStaticDeclare(double, projTraceStart);
63 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
64 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
65 #else
66 #define  START_EVENT()
67 #define  END_EVENT(x)
68 #endif
69
70 #define CMI_EXERT_SEND_CAP      0
71 #define CMI_EXERT_RECV_CAP      0
72
73 #if CMI_EXERT_SEND_CAP
74 #define SEND_CAP 16
75 #endif
76
77 #if CMI_EXERT_RECV_CAP
78 #define RECV_CAP 2
79 #endif
80
81 #define REMOTE_EVENT                      0
82 #define USE_LRTS_MEMPOOL                  1
83
84 #if USE_LRTS_MEMPOOL
85
86 #define oneMB (1024ll*1024)
87 static CmiInt8 _mempool_size = 8*oneMB;
88 static CmiInt8 _expand_mem =  4*oneMB;
89
90 #endif
91
92 #if CMK_SMP && COMM_THREAD_SEND 
93 //Dynamic flow control about memory registration
94 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
95 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
96 #else
97 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
98 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
99 #endif
100
101 static CmiInt8 buffered_send_msg = 0;
102 static int register_memory_size = 0;
103
104 static int BIG_MSG  =  4*oneMB;
105 static int ONE_SEG  =  2*oneMB;
106 static int BIG_MSG_PIPELINE = 4;
107
108 #if CMK_SMP
109 #define COMM_THREAD_SEND 1
110 //#define MULTI_THREAD_SEND 1
111 #endif
112
113 #if CMK_SMP && MULTI_THREAD_SEND
114 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
115 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
116 #else
117 #define     CMI_GNI_LOCK
118 #define     CMI_GNI_UNLOCK
119 #endif
120
121 static int _checkProgress = 1;             /* check deadlock */
122 static int _detected_hang = 0;
123
124 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
125
126 // dynamic SMSG
127 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
128
129 static int avg_smsg_connection = 32;
130 static int                 *smsg_connected_flag= 0;
131 static gni_smsg_attr_t     **smsg_attr_vector_local;
132 static gni_smsg_attr_t     **smsg_attr_vector_remote;
133 static gni_ep_handle_t     ep_hndl_unbound;
134 static gni_smsg_attr_t     send_smsg_attr;
135 static gni_smsg_attr_t     recv_smsg_attr;
136
137 typedef struct _dynamic_smsg_mailbox{
138    void     *mailbox_base;
139    int      size;
140    int      offset;
141    gni_mem_handle_t  mem_hndl;
142    struct      _dynamic_smsg_mailbox  *next;
143 }dynamic_smsg_mailbox_t;
144
145 static dynamic_smsg_mailbox_t  *mailbox_list;
146
147 int         rdma_id = 0;
148
149 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
150 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
151
152 #if PRINT_SYH
153 int         lrts_send_msg_id = 0;
154 int         lrts_local_done_msg = 0;
155 int         lrts_send_rdma_success = 0;
156 #endif
157
158 #include "machine.h"
159
160 #include "pcqueue.h"
161
162 #include "mempool.h"
163
164 #if CMK_PERSISTENT_COMM
165 #include "machine-persistent.h"
166 #endif
167
168 //#define  USE_ONESIDED 1
169 #ifdef USE_ONESIDED
170 //onesided implementation is wrong, since no place to restore omdh
171 #include "onesided.h"
172 onesided_hnd_t   onesided_hnd;
173 onesided_md_t    omdh;
174 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
175
176 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
177
178 #else
179 uint8_t   onesided_hnd, omdh;
180 #if REMOTE_EVENT
181 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
182          status = GNI_RC_ERROR_NOMEM;} \
183         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
184                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
185 #else
186 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
187     do {   \
188         if (register_memory_size + size >= MAX_REG_MEM) { \
189             status = GNI_RC_ERROR_NOMEM; \
190         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
191             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
192     } while(0)
193 #endif
194 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
195     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
196              register_memory_size -= size; \
197          else CmiAbort("MEM_DEregister");  \
198     } while (0)
199 #endif
200
201 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
202 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
203 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
204                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
205 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
206                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
207 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
208 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
209 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
210 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
211 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
212 #define   GetSizeFromHeader(x) ((block_header*)x)->size
213 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
214 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
215 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
216 #define   IsMemHndlZero(x)  ((x).qword1 == 0 && (x).qword2 == 0)
217 #define   SetMemHndlZero(x)  do { (x).qword1 = 0; (x).qword2 = 0; } while (0)
218 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
219
220 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
221 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
222 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
223 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
224
225 #define ALIGNBUF                64
226
227 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
228 /* If SMSG is not used */
229
230 #define FMA_PER_CORE  1024
231 #define FMA_BUFFER_SIZE 1024
232
233 /* If SMSG is used */
234 static int  SMSG_MAX_MSG = 1024;
235 #define SMSG_MAX_CREDIT 72 
236
237 #define MSGQ_MAXSIZE       2048
238 /* large message transfer with FMA or BTE */
239 #define LRTS_GNI_RDMA_THRESHOLD  1024 
240
241 #if CMK_SMP
242 static int  REMOTE_QUEUE_ENTRIES=163840; 
243 static int LOCAL_QUEUE_ENTRIES=163840; 
244 #else
245 static int  REMOTE_QUEUE_ENTRIES=20480;
246 static int LOCAL_QUEUE_ENTRIES=20480; 
247 #endif
248
249 #define BIG_MSG_TAG  0x26
250 #define PUT_DONE_TAG      0x28
251 #define DIRECT_PUT_DONE_TAG      0x29
252 #define ACK_TAG           0x30
253 /* SMSG is data message */
254 #define SMALL_DATA_TAG          0x31
255 /* SMSG is a control message to initialize a BTE */
256 #define MEDIUM_HEAD_TAG         0x32
257 #define MEDIUM_DATA_TAG         0x33
258 #define LMSG_INIT_TAG           0x39 
259 #define VERY_LMSG_INIT_TAG      0x40 
260 #define VERY_LMSG_TAG           0x41 
261
262 #define DEBUG
263 #ifdef GNI_RC_CHECK
264 #undef GNI_RC_CHECK
265 #endif
266 #ifdef DEBUG
267 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
268 #else
269 #define GNI_RC_CHECK(msg,rc)
270 #endif
271
272 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
273 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
274
275 static int useStaticMSGQ = 0;
276 static int useStaticFMA = 0;
277 static int mysize, myrank;
278 gni_nic_handle_t      nic_hndl;
279
280 typedef struct {
281     gni_mem_handle_t mdh;
282     uint64_t addr;
283 } mdh_addr_t ;
284 // this is related to dynamic SMSG
285
286 typedef struct mdh_addr_list{
287     gni_mem_handle_t mdh;
288    void *addr;
289     struct mdh_addr_list *next;
290 }mdh_addr_list_t;
291
292 static unsigned int         smsg_memlen;
293 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
294 mdh_addr_t          setup_mem;
295 mdh_addr_t          *smsg_connection_vec = 0;
296 gni_mem_handle_t    smsg_connection_memhndl;
297 static int          smsg_expand_slots = 10;
298 static int          smsg_available_slot = 0;
299 static void         *smsg_mailbox_mempool = 0;
300 mdh_addr_list_t     *smsg_dynamic_list = 0;
301
302 static void             *smsg_mailbox_base;
303 gni_msgq_attr_t         msgq_attrs;
304 gni_msgq_handle_t       msgq_handle;
305 gni_msgq_ep_attr_t      msgq_ep_attrs;
306 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
307
308 /* =====Beginning of Declarations of Machine Specific Variables===== */
309 static int cookie;
310 static int modes = 0;
311 static gni_cq_handle_t       smsg_rx_cqh = NULL;
312 static gni_cq_handle_t       smsg_tx_cqh = NULL;
313 static gni_cq_handle_t       post_rx_cqh = NULL;
314 static gni_cq_handle_t       post_tx_cqh = NULL;
315 static gni_ep_handle_t       *ep_hndl_array;
316 #if CMK_SMP && MULTI_THREAD_SEND
317 static CmiNodeLock           *ep_lock_array;
318 static CmiNodeLock           tx_cq_lock; 
319 static CmiNodeLock           rx_cq_lock;
320 static CmiNodeLock           *mempool_lock;
321 #endif
322 typedef struct msg_list
323 {
324     uint32_t destNode;
325     uint32_t size;
326     void *msg;
327     uint8_t tag;
328 #if !CMK_SMP
329     struct msg_list *next;
330 #endif
331 }MSG_LIST;
332
333
334 typedef struct control_msg
335 {
336     uint64_t            source_addr;    /* address from the start of buffer  */
337     uint64_t            dest_addr;      /* address from the start of buffer */
338     //int                 source;         /* source rank */
339     int                 total_length;   /* total length */
340     int                 length;         /* length of this packet */
341     uint8_t             seq_id;         //big message   0 meaning single message
342     gni_mem_handle_t    source_mem_hndl;
343     struct control_msg *next;
344 }CONTROL_MSG;
345
346 #ifdef CMK_DIRECT
347 typedef struct{
348     void *recverBuf;
349     void (*callbackFnPtr)(void *);
350     void *callbackData;
351 }CMK_DIRECT_HEADER;
352 #endif
353 typedef struct  rmda_msg
354 {
355     int                   destNode;
356     gni_post_descriptor_t *pd;
357 #if !CMK_SMP
358     struct  rmda_msg      *next;
359 #endif
360 }RDMA_REQUEST;
361
362 #if CMK_SMP
363 PCQueue sendRdmaBuf;
364 typedef struct  msg_list_index
365 {
366     int         next;
367     PCQueue     sendSmsgBuf;
368 } MSG_LIST_INDEX;
369 #else
370 static RDMA_REQUEST        *sendRdmaBuf = 0;
371 static RDMA_REQUEST        *sendRdmaTail = 0;
372 typedef struct  msg_list_index
373 {
374     int         next;
375     MSG_LIST    *sendSmsgBuf;
376     MSG_LIST    *tail;
377 } MSG_LIST_INDEX;
378 #endif
379
380 /* reuse PendingMsg memory */
381 static CONTROL_MSG          *control_freelist=0;
382 static MSG_LIST             *msglist_freelist=0;
383
384 typedef struct smsg_queue
385 {
386     MSG_LIST_INDEX   *smsg_msglist_index;
387     int               smsg_head_index;
388 } SMSG_QUEUE;
389
390 SMSG_QUEUE                  smsg_queue;
391 SMSG_QUEUE                  smsg_oob_queue;
392
393 #if CMK_SMP
394
395 #define FreeMsgList(d)   free(d);
396 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
397
398 #else
399
400 #define FreeMsgList(d)  \
401   (d)->next = msglist_freelist;\
402   msglist_freelist = d;
403
404 #define MallocMsgList(d) \
405   d = msglist_freelist;\
406   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
407              _MEMCHECK(d);\
408   } else msglist_freelist = d->next; \
409   d->next =0;
410 #endif
411
412 #if CMK_SMP
413
414 #define FreeControlMsg(d)      free(d);
415 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
416
417 #else
418
419 #define FreeControlMsg(d)       \
420   (d)->next = control_freelist;\
421   control_freelist = d;
422
423 #define MallocControlMsg(d) \
424   d = control_freelist;\
425   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
426              _MEMCHECK(d);\
427   } else control_freelist = d->next;
428
429 #endif
430
431 static RDMA_REQUEST         *rdma_freelist = NULL;
432
433 #define FreeMediumControlMsg(d)       \
434   (d)->next = medium_control_freelist;\
435   medium_control_freelist = d;
436
437
438 #define MallocMediumControlMsg(d) \
439     d = medium_control_freelist;\
440     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
441     _MEMCHECK(d);\
442 } else mediumcontrol_freelist = d->next;
443
444 # if CMK_SMP
445 #define FreeRdmaRequest(d)       free(d);
446 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
447 #else
448
449 #define FreeRdmaRequest(d)       \
450   (d)->next = rdma_freelist;\
451   rdma_freelist = d;
452
453 #define MallocRdmaRequest(d) \
454   d = rdma_freelist;\
455   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
456              _MEMCHECK(d);\
457   } else rdma_freelist = d->next; \
458     d->next =0;
459 #endif
460
461 /* reuse gni_post_descriptor_t */
462 static gni_post_descriptor_t *post_freelist=0;
463
464 #if  !CMK_SMP
465 #define FreePostDesc(d)       \
466     (d)->next_descr = post_freelist;\
467     post_freelist = d;
468
469 #define MallocPostDesc(d) \
470   d = post_freelist;\
471   if (d==0) { \
472      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
473      _MEMCHECK(d);\
474   } else post_freelist = d->next_descr;
475 #else
476
477 #define FreePostDesc(d)     free(d);
478 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
479
480 #endif
481
482
483 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
484 static int      buffered_smsg_counter = 0;
485
486 /* SmsgSend return success but message sent is not confirmed by remote side */
487 static MSG_LIST *buffered_fma_head = 0;
488 static MSG_LIST *buffered_fma_tail = 0;
489
490 /* functions  */
491 #define IsFree(a,ind)  !( a& (1<<(ind) ))
492 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
493 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
494
495 CpvDeclare(mempool_type*, mempool);
496
497 /* get the upper bound of log 2 */
498 int mylog2(int size)
499 {
500     int op = size;
501     unsigned int ret=0;
502     unsigned int mask = 0;
503     int i;
504     while(op>0)
505     {
506         op = op >> 1;
507         ret++;
508
509     }
510     for(i=1; i<ret; i++)
511     {
512         mask = mask << 1;
513         mask +=1;
514     }
515
516     ret -= ((size &mask) ? 0:1);
517     return ret;
518 }
519
520 static void
521 allgather(void *in,void *out, int len)
522 {
523     static int *ivec_ptr=NULL,already_called=0,job_size=0;
524     int i,rc;
525     int my_rank;
526     char *tmp_buf,*out_ptr;
527
528     if(!already_called) {
529
530         rc = PMI_Get_size(&job_size);
531         CmiAssert(rc == PMI_SUCCESS);
532         rc = PMI_Get_rank(&my_rank);
533         CmiAssert(rc == PMI_SUCCESS);
534
535         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
536         CmiAssert(ivec_ptr != NULL);
537
538         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
539         CmiAssert(rc == PMI_SUCCESS);
540
541         already_called = 1;
542
543     }
544
545     tmp_buf = (char *)malloc(job_size * len);
546     CmiAssert(tmp_buf);
547
548     rc = PMI_Allgather(in,tmp_buf,len);
549     CmiAssert(rc == PMI_SUCCESS);
550
551     out_ptr = out;
552
553     for(i=0;i<job_size;i++) {
554
555         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
556
557     }
558
559     free(tmp_buf);
560 }
561
562 static void
563 allgather_2(void *in,void *out, int len)
564 {
565     //PMI_Allgather is out of order
566     int i,rc, extend_len;
567     int  rank_index;
568     char *out_ptr, *out_ref;
569     char *in2;
570
571     extend_len = sizeof(int) + len;
572     in2 = (char*)malloc(extend_len);
573
574     memcpy(in2, &myrank, sizeof(int));
575     memcpy(in2+sizeof(int), in, len);
576
577     out_ptr = (char*)malloc(mysize*extend_len);
578
579     rc = PMI_Allgather(in2, out_ptr, extend_len);
580     GNI_RC_CHECK("allgather", rc);
581
582     out_ref = out;
583
584     for(i=0;i<mysize;i++) {
585         //rank index 
586         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
587         //copy to the rank index slot
588         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
589     }
590
591     free(out_ptr);
592     free(in2);
593
594 }
595
596 static unsigned int get_gni_nic_address(int device_id)
597 {
598     unsigned int address, cpu_id;
599     gni_return_t status;
600     int i, alps_dev_id=-1,alps_address=-1;
601     char *token, *p_ptr;
602
603     p_ptr = getenv("PMI_GNI_DEV_ID");
604     if (!p_ptr) {
605         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
606        
607         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
608     } else {
609         while ((token = strtok(p_ptr,":")) != NULL) {
610             alps_dev_id = atoi(token);
611             if (alps_dev_id == device_id) {
612                 break;
613             }
614             p_ptr = NULL;
615         }
616         CmiAssert(alps_dev_id != -1);
617         p_ptr = getenv("PMI_GNI_LOC_ADDR");
618         CmiAssert(p_ptr != NULL);
619         i = 0;
620         while ((token = strtok(p_ptr,":")) != NULL) {
621             if (i == alps_dev_id) {
622                 alps_address = atoi(token);
623                 break;
624             }
625             p_ptr = NULL;
626             ++i;
627         }
628         CmiAssert(alps_address != -1);
629         address = alps_address;
630     }
631     return address;
632 }
633
634 static uint8_t get_ptag(void)
635 {
636     char *p_ptr, *token;
637     uint8_t ptag;
638
639     p_ptr = getenv("PMI_GNI_PTAG");
640     CmiAssert(p_ptr != NULL);
641     token = strtok(p_ptr, ":");
642     ptag = (uint8_t)atoi(token);
643     return ptag;
644         
645 }
646
647 static uint32_t get_cookie(void)
648 {
649     uint32_t cookie;
650     char *p_ptr, *token;
651
652     p_ptr = getenv("PMI_GNI_COOKIE");
653     CmiAssert(p_ptr != NULL);
654     token = strtok(p_ptr, ":");
655     cookie = (uint32_t)atoi(token);
656
657     return cookie;
658 }
659
660 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
661 /* TODO: add any that are related */
662 /* =====End of Definitions of Message-Corruption Related Macros=====*/
663
664
665 #include "machine-lrts.h"
666 #include "machine-common-core.c"
667
668 /* Network progress function is used to poll the network when for
669    messages. This flushes receive buffers on some  implementations*/
670 #if CMK_MACHINE_PROGRESS_DEFINED
671 void CmiMachineProgressImpl() {
672 }
673 #endif
674
675 static void SendRdmaMsg();
676 static void PumpNetworkSmsg();
677 static void PumpLocalRdmaTransactions();
678 static int SendBufferMsg(SMSG_QUEUE *queue);
679
680 #if MACHINE_DEBUG_LOG
681 FILE *debugLog = NULL;
682 static CmiInt8 buffered_recv_msg = 0;
683 int         lrts_smsg_success = 0;
684 int         lrts_received_msg = 0;
685 #endif
686
687 static void sweep_mempool(mempool_type *mptr)
688 {
689     block_header *current = &(mptr->block_head);
690
691     printf("[n %d] sweep_mempool slot START.\n", myrank);
692     while( current!= NULL) {
693         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
694         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
695     }
696     printf("[n %d] sweep_mempool slot END.\n", myrank);
697 }
698
699 inline
700 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
701 {
702     block_header *current = *from;
703
704     //while(register_memory_size>= MAX_REG_MEM)
705     //{
706         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
707             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
708
709         *from = current;
710         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
711         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
712         SetMemHndlZero(GetMemHndlFromHeader(current));
713     //}
714     return GNI_RC_SUCCESS;
715 }
716
717 inline 
718 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
719 {
720     gni_return_t status = GNI_RC_SUCCESS;
721     //int size = GetMempoolsize(msg);
722     //void *blockaddr = GetMempoolBlockPtr(msg);
723     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
724    
725     block_header *current = &(mptr->block_head);
726     while(register_memory_size>= MAX_REG_MEM)
727     {
728         status = deregisterMemory(mptr, &current);
729         if (status != GNI_RC_SUCCESS) break;
730     }
731     if(register_memory_size>= MAX_REG_MEM) return status;
732
733     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
734     while(1)
735     {
736         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
737         if(status == GNI_RC_SUCCESS)
738         {
739             break;
740         }
741         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
742         {
743             CmiAbort("Memory registor for mempool fails\n");
744         }
745         else
746         {
747             status = deregisterMemory(mptr, &current);
748             if (status != GNI_RC_SUCCESS) break;
749         }
750     }; 
751     return status;
752 }
753
754 inline 
755 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
756 {
757     static int rank = -1;
758     int i;
759     gni_return_t status;
760     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
761     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
762     mempool_type *mptr;
763
764     status = registerFromMempool(mptr1, msg, size, t);
765     if (status == GNI_RC_SUCCESS) return status;
766 #if CMK_SMP 
767     for (i=0; i<CmiMyNodeSize()+1; i++) {
768       rank = (rank+1)%(CmiMyNodeSize()+1);
769       mptr = CpvAccessOther(mempool, rank);
770       if (mptr == mptr1) continue;
771       status = registerFromMempool(mptr, msg, size, t);
772       if (status == GNI_RC_SUCCESS) return status;
773     }
774 #endif
775     return  GNI_RC_ERROR_RESOURCE;
776 }
777
778 inline
779 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
780 {
781     MSG_LIST        *msg_tmp;
782     MallocMsgList(msg_tmp);
783     msg_tmp->destNode = destNode;
784     msg_tmp->size   = size;
785     msg_tmp->msg    = msg;
786     msg_tmp->tag    = tag;
787
788 #if !CMK_SMP
789     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
790         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
791         queue->smsg_head_index = destNode;
792         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
793     }else
794     {
795         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
796         queue->smsg_msglist_index[destNode].tail = msg_tmp;
797     }
798 #else
799     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
800 #endif
801 #if PRINT_SYH
802     buffered_smsg_counter++;
803 #endif
804 }
805
806 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
807 {
808     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
809 }
810
811 inline
812 static void setup_smsg_connection(int destNode)
813 {
814     mdh_addr_list_t  *new_entry = 0;
815     gni_post_descriptor_t *pd;
816     gni_smsg_attr_t      *smsg_attr;
817     gni_return_t status = GNI_RC_NOT_DONE;
818     RDMA_REQUEST        *rdma_request_msg;
819     
820     if(smsg_available_slot == smsg_expand_slots)
821     {
822         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
823         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
824         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
825
826         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
827             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
828             GNI_MEM_READWRITE,   
829             -1,
830             &(new_entry->mdh));
831         smsg_available_slot = 0; 
832         new_entry->next = smsg_dynamic_list;
833         smsg_dynamic_list = new_entry;
834     }
835     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
836     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
837     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
838     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
839     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
840     smsg_attr->buff_size = smsg_memlen;
841     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
842     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
843     smsg_local_attr_vec[destNode] = smsg_attr;
844     smsg_available_slot++;
845     MallocPostDesc(pd);
846     pd->type            = GNI_POST_FMA_PUT;
847     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
848     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
849     pd->length          = sizeof(gni_smsg_attr_t);
850     pd->local_addr      = (uint64_t) smsg_attr;
851     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
852     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
853     pd->src_cq_hndl     = 0;
854     pd->rdma_mode       = 0;
855     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
856     print_smsg_attr(smsg_attr);
857     if(status == GNI_RC_ERROR_RESOURCE )
858     {
859         MallocRdmaRequest(rdma_request_msg);
860         rdma_request_msg->destNode = destNode;
861         rdma_request_msg->pd = pd;
862         /* buffer this request */
863     }
864 #if PRINT_SYH
865     if(status != GNI_RC_SUCCESS)
866        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
867     else
868         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
869 #endif
870 }
871
872 /* useDynamicSMSG */
873 inline 
874 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
875 {
876     gni_return_t status = GNI_RC_NOT_DONE;
877
878     if(mailbox_list->offset == mailbox_list->size)
879     {
880         dynamic_smsg_mailbox_t *new_mailbox_entry;
881         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
882         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
883         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
884         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
885         new_mailbox_entry->offset = 0;
886         
887         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
888             new_mailbox_entry->size, smsg_rx_cqh,
889             GNI_MEM_READWRITE,   
890             -1,
891             &(new_mailbox_entry->mem_hndl));
892
893         GNI_RC_CHECK("register", status);
894         new_mailbox_entry->next = mailbox_list;
895         mailbox_list = new_mailbox_entry;
896     }
897     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
898     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
899     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
900     local_smsg_attr->mbox_offset = mailbox_list->offset;
901     mailbox_list->offset += smsg_memlen;
902     local_smsg_attr->buff_size = smsg_memlen;
903     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
904     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
905 }
906
907 /* useDynamicSMSG */
908 inline 
909 static int connect_to(int destNode)
910 {
911     gni_return_t status = GNI_RC_NOT_DONE;
912     CmiAssert(smsg_connected_flag[destNode] == 0);
913     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
914     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
915     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
916     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
917     
918     CMI_GNI_LOCK
919     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
920     CMI_GNI_UNLOCK
921     if (status == GNI_RC_ERROR_RESOURCE) {
922       /* possibly destNode is making connection at the same time */
923       free(smsg_attr_vector_local[destNode]);
924       smsg_attr_vector_local[destNode] = NULL;
925       free(smsg_attr_vector_remote[destNode]);
926       smsg_attr_vector_remote[destNode] = NULL;
927       mailbox_list->offset -= smsg_memlen;
928       return 0;
929     }
930     GNI_RC_CHECK("GNI_Post", status);
931     smsg_connected_flag[destNode] = 1;
932     return 1;
933 }
934
935 inline 
936 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
937 {
938     unsigned int          remote_address;
939     uint32_t              remote_id;
940     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
941     gni_smsg_attr_t       *smsg_attr;
942     gni_post_descriptor_t *pd;
943     gni_post_state_t      post_state;
944     char                  *real_data; 
945
946     if (useDynamicSMSG) {
947         switch (smsg_connected_flag[destNode]) {
948         case 0: 
949             connect_to(destNode);         /* continue to case 1 */
950         case 1:                           /* pending connection, do nothing */
951             status = GNI_RC_NOT_DONE;
952             if(inbuff ==0)
953                 buffer_small_msgs(queue, msg, size, destNode, tag);
954             return status;
955         }
956     }
957 #if CMK_SMP
958     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
959     {
960 #else
961     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
962     {
963 #endif
964         CMI_GNI_LOCK
965         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
966         CMI_GNI_UNLOCK
967         if(status == GNI_RC_SUCCESS)
968         {
969 #if CMK_SMP_TRACE_COMMTHREAD
970             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
971             { 
972                 START_EVENT();
973                 if ( tag == SMALL_DATA_TAG)
974                     real_data = (char*)msg; 
975                 else 
976                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
977                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
978             }
979 #endif
980             smsg_send_count ++;
981             return status;
982         }else
983             status = GNI_RC_ERROR_RESOURCE;
984     }
985     if(inbuff ==0)
986         buffer_small_msgs(queue, msg, size, destNode, tag);
987     return status;
988 }
989
990
991 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
992 {
993     /* construct a control message and send */
994     CONTROL_MSG         *control_msg_tmp;
995     MallocControlMsg(control_msg_tmp);
996     control_msg_tmp->source_addr = (uint64_t)msg;
997     control_msg_tmp->seq_id    = seqno;
998     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
999 #if     USE_LRTS_MEMPOOL
1000     if(size < BIG_MSG)
1001     {
1002         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1003     }
1004     else
1005     {
1006         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1007         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1008         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1009     }
1010 #else
1011     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1012 #endif
1013     return control_msg_tmp;
1014 }
1015
1016 // Large message, send control to receiver, receiver register memory and do a GET, 
1017 // return 1 - send no success
1018 inline
1019 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1020 {
1021     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1022     uint32_t            vmdh_index  = -1;
1023     int                 size;
1024     int                 offset = 0;
1025     uint64_t            source_addr;
1026     int                 register_size; 
1027     void                *msg;
1028
1029     size    =   control_msg_tmp->total_length;
1030     source_addr = control_msg_tmp->source_addr;
1031     register_size = control_msg_tmp->length;
1032
1033 #if     USE_LRTS_MEMPOOL
1034     if( control_msg_tmp->seq_id == 0 ){
1035         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1036         {
1037             msg = (void*)source_addr;
1038             //register the corresponding mempool
1039             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1040             {
1041                 if(!inbuff)
1042                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1043                 return GNI_RC_ERROR_NOMEM;
1044             }
1045             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1046             if(status == GNI_RC_SUCCESS)
1047             {
1048                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1049             }
1050         }else
1051         {
1052             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1053             status = GNI_RC_SUCCESS;
1054         }
1055         if(NoMsgInSend( control_msg_tmp->source_addr))
1056             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1057         else
1058             register_size = 0;
1059     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1060     {
1061         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1062         source_addr += offset;
1063         size = control_msg_tmp->length;
1064         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1065             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1066             {
1067                 if(!inbuff)
1068                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1069                 return GNI_RC_ERROR_NOMEM;
1070             }
1071             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1072             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1073         }
1074         else
1075         {
1076             status = GNI_RC_SUCCESS;
1077         }
1078         register_size = 0;  
1079     }
1080
1081     if(status == GNI_RC_SUCCESS)
1082     {
1083         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1084         if(status == GNI_RC_SUCCESS)
1085         {
1086             buffered_send_msg += register_size;
1087             if(control_msg_tmp->seq_id == 0)
1088             {
1089                 IncreaseMsgInSend(source_addr);
1090             }
1091             FreeControlMsg(control_msg_tmp);
1092             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1093         }else
1094             status = GNI_RC_ERROR_RESOURCE;
1095
1096     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1097     {
1098         CmiAbort("Memory registor for large msg\n");
1099     }else 
1100     {
1101         status = GNI_RC_ERROR_NOMEM; 
1102         if(!inbuff)
1103             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1104     }
1105     return status;
1106 #else
1107     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1108     if(status == GNI_RC_SUCCESS)
1109     {
1110         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1111         if(status == GNI_RC_SUCCESS)
1112         {
1113             FreeControlMsg(control_msg_tmp);
1114         }
1115     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1116     {
1117         CmiAbort("Memory registor for large msg\n");
1118     }else 
1119     {
1120         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1121     }
1122     return status;
1123 #endif
1124 }
1125
1126 inline void LrtsPrepareEnvelope(char *msg, int size)
1127 {
1128     CmiSetMsgSize(msg, size);
1129 }
1130
1131 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1132 {
1133     gni_return_t        status  =   GNI_RC_SUCCESS;
1134     uint8_t tag;
1135     CONTROL_MSG         *control_msg_tmp;
1136     int                 oob = ( mode & OUT_OF_BAND);
1137     SMSG_QUEUE          *queue;
1138
1139     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1140 #if CMK_USE_OOB
1141     queue = oob? &smsg_oob_queue : &smsg_queue;
1142 #else
1143     queue = &smsg_queue;
1144 #endif
1145
1146     LrtsPrepareEnvelope(msg, size);
1147
1148 #if PRINT_SYH
1149     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1150 #endif 
1151 #if CMK_SMP && COMM_THREAD_SEND
1152     if(size <= SMSG_MAX_MSG)
1153         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1154     else if (size < BIG_MSG) {
1155         control_msg_tmp =  construct_control_msg(size, msg, 0);
1156         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1157     }
1158     else {
1159           CmiSetMsgSeq(msg, 0);
1160           control_msg_tmp =  construct_control_msg(size, msg, 1);
1161           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1162     }
1163 #else   //non-smp, smp(worker sending)
1164     if(size <= SMSG_MAX_MSG)
1165     {
1166         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1167             CmiFree(msg);
1168     }
1169     else if (size < BIG_MSG) {
1170         control_msg_tmp =  construct_control_msg(size, msg, 0);
1171         send_large_messages(queue, destNode, control_msg_tmp, 0);
1172     }
1173     else {
1174 #if     USE_LRTS_MEMPOOL
1175         CmiSetMsgSeq(msg, 0);
1176         control_msg_tmp =  construct_control_msg(size, msg, 1);
1177         send_large_messages(queue, destNode, control_msg_tmp, 0);
1178 #else
1179         control_msg_tmp =  construct_control_msg(size, msg, 0);
1180         send_large_messages(queue, destNode, control_msg_tmp, 0);
1181 #endif
1182     }
1183 #endif
1184     return 0;
1185 }
1186
1187 static void    PumpDatagramConnection();
1188 static void registerUserTraceEvents() {
1189 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1190     traceRegisterUserEvent("setting up connections", 10);
1191     traceRegisterUserEvent("Receiving small msgs", 20);
1192     traceRegisterUserEvent("Release local transaction", 30);
1193     traceRegisterUserEvent("Sending buffered small msgs", 40);
1194     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1195 #endif
1196 }
1197
1198 static void ProcessDeadlock()
1199 {
1200     static CmiUInt8 *ptr = NULL;
1201     static CmiUInt8  last = 0, mysum, sum;
1202     static int count = 0;
1203     gni_return_t status;
1204     int i;
1205
1206 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1207 //sweep_mempool(CpvAccess(mempool));
1208     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1209     mysum = smsg_send_count + smsg_recv_count;
1210     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1211     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1212     GNI_RC_CHECK("PMI_Allgather", status);
1213     sum = 0;
1214     for (i=0; i<mysize; i++)  sum+= ptr[i];
1215     if (last == 0 || sum == last) 
1216         count++;
1217     else
1218         count = 0;
1219     last = sum;
1220     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1221     if (count == 2) { 
1222         /* detected twice, it is a real deadlock */
1223         if (myrank == 0)  {
1224             CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage. (%d, %d)\n", buffered_send_msg, register_memory_size);
1225             CmiAbort("Fatal> Deadlock detected.");
1226         }
1227     }
1228     _detected_hang = 0;
1229 }
1230
1231 static void CheckProgress()
1232 {
1233     if (smsg_send_count == last_smsg_send_count &&
1234         smsg_recv_count == last_smsg_recv_count ) 
1235     {
1236         _detected_hang = 1;
1237 #if !CMK_SMP
1238         if (_detected_hang) ProcessDeadlock();
1239 #endif
1240
1241     }
1242     else {
1243         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1244         last_smsg_send_count = smsg_send_count;
1245         last_smsg_recv_count = smsg_recv_count;
1246         _detected_hang = 0;
1247     }
1248 }
1249
1250 static void set_limit()
1251 {
1252     if (CmiMyRank() == 0) {
1253         int mynode = CmiPhysicalNodeID(CmiMyPe());
1254         int numpes = CmiNumPesOnPhysicalNode(mynode);
1255         int numprocesses = numpes / CmiMyNodeSize();
1256         int totalmem = 1024*1024*1024*0.9;
1257         int mem_max = totalmem / numprocesses;
1258         int send_max = mem_max / 2;
1259        if(CmiMyPe() == 0)
1260            printf("mem_max = %d, send_max =%d\n", mem_max, send_max);
1261     }
1262 }
1263
1264 void LrtsPostCommonInit(int everReturn)
1265 {
1266 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1267     CpvInitialize(double, projTraceStart);
1268     /* only PE 0 needs to care about registration (to generate sts file). */
1269     if (CmiMyPe() == 0) {
1270         registerMachineUserEventsFunction(&registerUserTraceEvents);
1271     }
1272 #endif
1273
1274 #if CMK_SMP
1275     CmiIdleState *s=CmiNotifyGetState();
1276     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1277     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1278 #else
1279     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1280     if (useDynamicSMSG)
1281     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1282 #endif
1283
1284     if (_checkProgress)
1285 #if CMK_SMP
1286     if (CmiMyRank() == 0)
1287 #endif
1288     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1289
1290     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1291 }
1292
1293 /* this is called by worker thread */
1294 void LrtsPostNonLocal(){
1295 #if CMK_SMP
1296 #if MULTI_THREAD_SEND
1297     if(mysize == 1) return;
1298     PumpLocalRdmaTransactions();
1299 #if CMK_USE_OOB
1300     if (SendBufferMsg(&smsg_oob_queue) == 1)
1301 #endif
1302     SendBufferMsg(&smsg_queue);
1303     SendRdmaMsg();
1304 #endif
1305 #endif
1306 }
1307
1308 /* useDynamicSMSG */
1309 static void    PumpDatagramConnection()
1310 {
1311     uint32_t          remote_address;
1312     uint32_t          remote_id;
1313     gni_return_t status;
1314     gni_post_state_t  post_state;
1315     uint64_t          datagram_id;
1316     int i;
1317
1318    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1319    {
1320        if (datagram_id >= mysize) {           /* bound endpoint */
1321            int pe = datagram_id - mysize;
1322            CMI_GNI_LOCK
1323            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1324            CMI_GNI_UNLOCK
1325            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1326            {
1327                CmiAssert(remote_id == pe);
1328                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1329                GNI_RC_CHECK("Dynamic SMSG Init", status);
1330 #if PRINT_SYH
1331                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1332 #endif
1333                CmiAssert(smsg_connected_flag[pe] == 1);
1334                smsg_connected_flag[pe] = 2;
1335            }
1336        }
1337        else {         /* unbound ep */
1338            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1339            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1340            {
1341                CmiAssert(remote_id<mysize);
1342                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1343                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1344                GNI_RC_CHECK("Dynamic SMSG Init", status);
1345 #if PRINT_SYH
1346                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1347 #endif
1348                smsg_connected_flag[remote_id] = 2;
1349
1350                alloc_smsg_attr(&send_smsg_attr);
1351                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1352                GNI_RC_CHECK("post unbound datagram", status);
1353            }
1354        }
1355    }
1356 }
1357
1358 /* pooling CQ to receive network message */
1359 static void PumpNetworkRdmaMsgs()
1360 {
1361     gni_cq_entry_t      event_data;
1362     gni_return_t        status;
1363
1364 }
1365
1366 inline 
1367 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1368 {
1369     RDMA_REQUEST        *rdma_request_msg;
1370     MallocRdmaRequest(rdma_request_msg);
1371     rdma_request_msg->destNode = inst_id;
1372     rdma_request_msg->pd = pd;
1373 #if CMK_SMP
1374     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1375 #else
1376     if(sendRdmaBuf == 0)
1377     {
1378         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1379     }else{
1380         sendRdmaTail->next = rdma_request_msg;
1381         sendRdmaTail =  rdma_request_msg;
1382     }
1383 #endif
1384
1385 }
1386 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1387 static void PumpNetworkSmsg()
1388 {
1389     uint64_t            inst_id;
1390     int                 ret;
1391     gni_cq_entry_t      event_data;
1392     gni_return_t        status, status2;
1393     void                *header;
1394     uint8_t             msg_tag;
1395     int                 msg_nbytes;
1396     void                *msg_data;
1397     gni_mem_handle_t    msg_mem_hndl;
1398     gni_smsg_attr_t     *smsg_attr;
1399     gni_smsg_attr_t     *remote_smsg_attr;
1400     int                 init_flag;
1401     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1402     uint64_t            source_addr;
1403     SMSG_QUEUE         *queue = &smsg_queue;
1404
1405     while(1)
1406     {
1407         CMI_GNI_LOCK
1408         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1409         CMI_GNI_UNLOCK
1410         if(status != GNI_RC_SUCCESS)
1411             break;
1412         inst_id = GNI_CQ_GET_INST_ID(event_data);
1413         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1414 #if PRINT_SYH
1415         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1416 #endif
1417         if (useDynamicSMSG) {
1418             /* subtle: smsg may come before connection is setup */
1419             while (smsg_connected_flag[inst_id] != 2) 
1420                PumpDatagramConnection();
1421         }
1422         msg_tag = GNI_SMSG_ANY_TAG;
1423         while(1) {
1424             CMI_GNI_LOCK
1425             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1426             CMI_GNI_UNLOCK
1427             if (status != GNI_RC_SUCCESS)
1428                 break;
1429 #if PRINT_SYH
1430             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1431 #endif
1432             /* copy msg out and then put into queue (small message) */
1433             switch (msg_tag) {
1434             case SMALL_DATA_TAG:
1435             {
1436                 START_EVENT();
1437                 msg_nbytes = CmiGetMsgSize(header);
1438                 msg_data    = CmiAlloc(msg_nbytes);
1439                 memcpy(msg_data, (char*)header, msg_nbytes);
1440 #if CMK_SMP_TRACE_COMMTHREAD
1441                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1442 #endif
1443                 handleOneRecvedMsg(msg_nbytes, msg_data);
1444                 break;
1445             }
1446             case LMSG_INIT_TAG:
1447             {
1448                 getLargeMsgRequest(header, inst_id);
1449                 break;
1450             }
1451             case ACK_TAG:   //msg fit into mempool
1452             {
1453                 /* Get is done, release message . Now put is not used yet*/
1454                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1455 #if ! USE_LRTS_MEMPOOL
1456                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1457 #else
1458                 DecreaseMsgInSend(msg);
1459 #endif
1460                 if(NoMsgInSend(msg))
1461                     buffered_send_msg -= GetMempoolsize(msg);
1462                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1463                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1464                 break;
1465             }
1466             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1467             {
1468                 header_tmp = (CONTROL_MSG *) header;
1469                 void *msg = (void*)(header_tmp->source_addr);
1470                 int cur_seq = CmiGetMsgSeq(msg);
1471                 int offset = ONE_SEG*(cur_seq+1);
1472                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1473                 buffered_send_msg -= header_tmp->length;
1474                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1475                 if (remain_size < 0) remain_size = 0;
1476                 CmiSetMsgSize(msg, remain_size);
1477                 if(remain_size <= 0) //transaction done
1478                 {
1479                     CmiFree(msg);
1480                 }else if (header_tmp->total_length > offset)
1481                 {
1482                     CmiSetMsgSeq(msg, cur_seq+1);
1483                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1484                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1485                     //send next seg
1486                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1487                          // pipelining
1488                     if (header_tmp->seq_id == 1) {
1489                       int i;
1490                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1491                         int seq = cur_seq+i+2;
1492                         CmiSetMsgSeq(msg, seq-1);
1493                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1494                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1495                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1496                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1497                       }
1498                     }
1499                 }
1500                 break;
1501             }
1502 #if CMK_PERSISTENT_COMM
1503             case PUT_DONE_TAG: //persistent message
1504                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1505                 int size = ((CONTROL_MSG *) header)->length;
1506                 CmiReference(msg);
1507                 handleOneRecvedMsg(size, msg); 
1508                 break;
1509 #endif
1510 #ifdef CMK_DIRECT
1511             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1512                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1513                 break;
1514 #endif
1515             default: {
1516                 printf("weird tag problem\n");
1517                 CmiAbort("Unknown tag\n");
1518                      }
1519             }               // end switch
1520 #if PRINT_SYH
1521             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1522 #endif
1523             CMI_GNI_LOCK
1524             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1525             CMI_GNI_UNLOCK
1526             smsg_recv_count ++;
1527             msg_tag = GNI_SMSG_ANY_TAG;
1528         } //endwhile getNext
1529     }   //end while GetEvent
1530     if(status == GNI_RC_ERROR_RESOURCE)
1531     {
1532         GNI_RC_CHECK("Smsg_rx_cq full", status);
1533     }
1534 }
1535
1536 static void printDesc(gni_post_descriptor_t *pd)
1537 {
1538     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1539 }
1540
1541 // for BIG_MSG called on receiver side for receiving control message
1542 // LMSG_INIT_TAG
1543 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1544 {
1545 #if     USE_LRTS_MEMPOOL
1546     CONTROL_MSG         *request_msg;
1547     gni_return_t        status = GNI_RC_SUCCESS;
1548     void                *msg_data;
1549     gni_post_descriptor_t *pd;
1550     gni_mem_handle_t    msg_mem_hndl;
1551     int source, size, transaction_size, offset = 0;
1552     int     register_size = 0;
1553
1554     // initial a get to transfer data from the sender side */
1555     request_msg = (CONTROL_MSG *) header;
1556     size = request_msg->total_length;
1557     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1558     if(request_msg->seq_id < 2)   {
1559         msg_data = CmiAlloc(size);
1560         CmiSetMsgSeq(msg_data, 0);
1561         _MEMCHECK(msg_data);
1562     }
1563     else {
1564         offset = ONE_SEG*(request_msg->seq_id-1);
1565         msg_data = (char*)request_msg->dest_addr + offset;
1566     }
1567    
1568     MallocPostDesc(pd);
1569     pd->cqwrite_value = request_msg->seq_id;
1570     if( request_msg->seq_id == 0)
1571     {
1572         pd->local_mem_hndl= GetMemHndl(msg_data);
1573         transaction_size = ALIGN64(size);
1574         if(IsMemHndlZero(pd->local_mem_hndl))
1575         {   
1576             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1577             if(status == GNI_RC_SUCCESS)
1578             {
1579                 pd->local_mem_hndl = GetMemHndl(msg_data);
1580             }
1581             else
1582             {
1583                 SetMemHndlZero(pd->local_mem_hndl);
1584             }
1585         }
1586         if(NoMsgInRecv( (void*)(msg_data)))
1587             register_size = GetMempoolsize((void*)(msg_data));
1588         else
1589             register_size = 0;
1590     }
1591     else{
1592         transaction_size = ALIGN64(request_msg->length);
1593         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1594         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1595         {
1596             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1597         }
1598     }
1599     pd->first_operand = ALIGN64(size);                   //  total length
1600
1601     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1602         pd->type            = GNI_POST_FMA_GET;
1603     else
1604         pd->type            = GNI_POST_RDMA_GET;
1605 #if REMOTE_EVENT
1606     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1607 #else
1608     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1609 #endif
1610     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1611     pd->length          = transaction_size;
1612     pd->local_addr      = (uint64_t) msg_data;
1613     pd->remote_addr     = request_msg->source_addr + offset;
1614     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1615     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1616     pd->rdma_mode       = 0;
1617     pd->amo_cmd         = 0;
1618
1619     //memory registration success
1620     if(status == GNI_RC_SUCCESS)
1621     {
1622         CMI_GNI_LOCK
1623         if(pd->type == GNI_POST_RDMA_GET) 
1624             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1625         else
1626             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1627         CMI_GNI_UNLOCK
1628
1629         if(status == GNI_RC_SUCCESS )
1630         {
1631             if(pd->cqwrite_value == 0)
1632             {
1633 #if MACHINE_DEBUG_LOG
1634                 buffered_recv_msg += register_size;
1635                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1636 #endif
1637                 IncreaseMsgInRecv(msg_data);
1638             }
1639         }
1640     }else
1641     {
1642         SetMemHndlZero((pd->local_mem_hndl));
1643     }
1644     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1645     {
1646         bufferRdmaMsg(inst_id, pd); 
1647     }else {
1648          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1649         GNI_RC_CHECK("GetLargeAFter posting", status);
1650     }
1651 #else
1652     CONTROL_MSG         *request_msg;
1653     gni_return_t        status;
1654     void                *msg_data;
1655     gni_post_descriptor_t *pd;
1656     RDMA_REQUEST        *rdma_request_msg;
1657     gni_mem_handle_t    msg_mem_hndl;
1658     //int source;
1659     // initial a get to transfer data from the sender side */
1660     request_msg = (CONTROL_MSG *) header;
1661     msg_data = CmiAlloc(request_msg->length);
1662     _MEMCHECK(msg_data);
1663
1664     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1665
1666     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1667     {
1668         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1669     }
1670
1671     MallocPostDesc(pd);
1672     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1673         pd->type            = GNI_POST_FMA_GET;
1674     else
1675         pd->type            = GNI_POST_RDMA_GET;
1676 #if REMOTE_EVENT
1677     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1678 #else
1679     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1680 #endif
1681     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1682     pd->length          = ALIGN64(request_msg->length);
1683     pd->local_addr      = (uint64_t) msg_data;
1684     pd->remote_addr     = request_msg->source_addr;
1685     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1686     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1687     pd->rdma_mode       = 0;
1688     pd->amo_cmd         = 0;
1689
1690     //memory registration successful
1691     if(status == GNI_RC_SUCCESS)
1692     {
1693         pd->local_mem_hndl  = msg_mem_hndl;
1694         CMI_GNI_LOCK
1695         if(pd->type == GNI_POST_RDMA_GET) 
1696             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1697         else
1698             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1699         CMI_GNI_UNLOCK
1700     }else
1701     {
1702         SetMemHndlZero(pd->local_mem_hndl);
1703     }
1704     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1705     {
1706         MallocRdmaRequest(rdma_request_msg);
1707         rdma_request_msg->next = 0;
1708         rdma_request_msg->destNode = inst_id;
1709         rdma_request_msg->pd = pd;
1710         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1711     }else {
1712         GNI_RC_CHECK("AFter posting", status);
1713     }
1714 #endif
1715 }
1716
1717 static void PumpLocalRdmaTransactions()
1718 {
1719     gni_cq_entry_t          ev;
1720     gni_return_t            status;
1721     uint64_t                type, inst_id;
1722     gni_post_descriptor_t   *tmp_pd;
1723     MSG_LIST                *ptr;
1724     CONTROL_MSG             *ack_msg_tmp;
1725     uint8_t                 msg_tag;
1726 #ifdef CMK_DIRECT
1727     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1728 #endif
1729     SMSG_QUEUE         *queue = &smsg_queue;
1730
1731     while(1) {
1732         CMI_GNI_LOCK 
1733         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1734         CMI_GNI_UNLOCK
1735         if(status != GNI_RC_SUCCESS) break;
1736         
1737         type = GNI_CQ_GET_TYPE(ev);
1738         if (type == GNI_CQ_EVENT_TYPE_POST)
1739         {
1740             inst_id     = GNI_CQ_GET_INST_ID(ev);
1741 #if PRINT_SYH
1742             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1743 #endif
1744             CMI_GNI_LOCK
1745             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1746             CMI_GNI_UNLOCK
1747 #ifdef CMK_DIRECT
1748             if(tmp_pd->amo_cmd == 1)
1749             {
1750                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1751                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1752                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1753                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1754             }
1755             else{
1756                 MallocControlMsg(ack_msg_tmp);
1757                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1758                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1759             }
1760 #else
1761             MallocControlMsg(ack_msg_tmp);
1762             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1763             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1764 #endif
1765             ////Message is sent, free message , put is not used now
1766             switch (tmp_pd->type) {
1767 #if CMK_PERSISTENT_COMM
1768             case GNI_POST_RDMA_PUT:
1769 #if ! USE_LRTS_MEMPOOL
1770                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1771 #endif
1772             case GNI_POST_FMA_PUT:
1773                 CmiFree((void *)tmp_pd->local_addr);
1774                 msg_tag = PUT_DONE_TAG;
1775                 break;
1776 #endif
1777 #ifdef CMK_DIRECT
1778             case GNI_POST_RDMA_PUT:
1779             case GNI_POST_FMA_PUT:
1780                 //sender ACK to receiver to trigger it is done
1781                 msg_tag = DIRECT_PUT_DONE_TAG;
1782                 break;
1783 #endif
1784             case GNI_POST_RDMA_GET:
1785             case GNI_POST_FMA_GET:
1786 #if  ! USE_LRTS_MEMPOOL
1787                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1788                 msg_tag = ACK_TAG;  
1789 #else
1790                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1791                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1792                 {
1793                     msg_tag = BIG_MSG_TAG; 
1794                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1795                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1796                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1797                 } 
1798                 else
1799                 {
1800                     msg_tag = ACK_TAG;  
1801                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1802                 }
1803                 ack_msg_tmp->length = tmp_pd->length;
1804                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1805 #endif
1806                 break;
1807             default:
1808                 CmiPrintf("type=%d\n", tmp_pd->type);
1809                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1810             }
1811 #if CMK_DIRECT
1812             if(tmp_pd->amo_cmd == 1)
1813                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1814             else
1815 #endif
1816                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1817             if(status == GNI_RC_SUCCESS)
1818             {
1819 #if CMK_DIRECT
1820                 if(tmp_pd->amo_cmd == 1)
1821                     free(cmk_direct_done_msg); 
1822                 else
1823 #endif
1824                     FreeControlMsg(ack_msg_tmp);
1825
1826             }
1827 #if CMK_PERSISTENT_COMM
1828             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1829 #endif
1830             {
1831                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1832 #if PRINT_SYH
1833                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1834 #endif
1835 #if CMK_SMP_TRACE_COMMTHREAD
1836                     START_EVENT();
1837 #endif
1838                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1839                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1840 #if MACHINE_DEBUG_LOG
1841                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1842                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1843                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1844 #endif
1845 #if CMK_SMP_TRACE_COMMTHREAD
1846                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1847 #endif
1848                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1849                 }else if(msg_tag == BIG_MSG_TAG){
1850                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1851                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1852                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1853 #if CMK_SMP_TRACE_COMMTHREAD
1854                         START_EVENT();
1855 #endif
1856 #if PRINT_SYH
1857                         printf("Pipeline msg done [%d]\n", myrank);
1858 #endif
1859 #if CMK_SMP_TRACE_COMMTHREAD
1860                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1861 #endif
1862                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1863                     }
1864                 }
1865             }
1866             FreePostDesc(tmp_pd);
1867         }
1868     } //end while
1869     if(status == GNI_RC_ERROR_RESOURCE)
1870     {
1871         GNI_RC_CHECK("Smsg_tx_cq full", status);
1872     }
1873 }
1874
1875 static void  SendRdmaMsg()
1876 {
1877     gni_return_t            status = GNI_RC_SUCCESS;
1878     gni_mem_handle_t        msg_mem_hndl;
1879     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
1880     RDMA_REQUEST            *pre = 0;
1881     int i, register_size = 0;
1882     void                    *msg;
1883 #if CMK_SMP
1884     int len = PCQueueLength(sendRdmaBuf);
1885     for (i=0; i<len; i++)
1886     {
1887         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1888         if (ptr == NULL) break;
1889 #else
1890     ptr = sendRdmaBuf;
1891     while (ptr!=0)
1892     {
1893 #endif 
1894         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1895         gni_post_descriptor_t *pd = ptr->pd;
1896         status = GNI_RC_SUCCESS;
1897         
1898         if(pd->cqwrite_value == 0)
1899         {
1900             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1901             {
1902                 msg = (void*)(pd->local_addr);
1903                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1904                 if(status == GNI_RC_SUCCESS)
1905                 {
1906                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1907                 }
1908             }else
1909             {
1910                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1911             }
1912             if(NoMsgInRecv( (void*)(pd->local_addr)))
1913                 register_size = GetMempoolsize((void*)(pd->local_addr));
1914             else
1915                 register_size = 0;
1916         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1917         {
1918             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1919         }
1920         if(status == GNI_RC_SUCCESS)        //mem register good
1921         {
1922             CMI_GNI_LOCK
1923             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1924                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1925             else
1926                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1927             CMI_GNI_UNLOCK
1928             if(status == GNI_RC_SUCCESS)    //post good
1929             {
1930 #if !CMK_SMP
1931                 tmp_ptr = ptr;
1932                 if(pre != 0) {
1933                     pre->next = ptr->next;
1934                 }
1935                 else {
1936                     sendRdmaBuf = ptr->next;
1937                 }
1938                 ptr = ptr->next;
1939                 FreeRdmaRequest(tmp_ptr);
1940 #endif
1941                 if(pd->cqwrite_value == 0)
1942                 {
1943                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1944                 }
1945 #if MACHINE_DEBUG_LOG
1946                 buffered_recv_msg += register_size;
1947                 MACHSTATE(8, "GO request from buffered\n"); 
1948 #endif
1949             }else           // cannot post
1950             {
1951 #if CMK_SMP
1952                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1953 #else
1954                 pre = ptr;
1955                 ptr = ptr->next;
1956 #endif
1957                 break;
1958             }
1959         } else          //memory registration fails
1960         {
1961 #if CMK_SMP
1962             PCQueuePush(sendRdmaBuf, (char*)ptr);
1963 #else
1964             pre = ptr;
1965             ptr = ptr->next;
1966 #endif
1967         }
1968     } //end while
1969 #if ! CMK_SMP
1970     if(ptr == 0)
1971         sendRdmaTail = pre;
1972 #endif
1973 }
1974
1975 // return 1 if all messages are sent
1976 static int SendBufferMsg(SMSG_QUEUE *queue)
1977 {
1978     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1979     CONTROL_MSG         *control_msg_tmp;
1980     gni_return_t        status;
1981     int                 done = 1;
1982     int                 register_size;
1983     void                *register_addr;
1984     int                 index_previous = -1;
1985     int                 index = queue->smsg_head_index;
1986 #if CMI_EXERT_SEND_CAP
1987     int                 sent_cnt = 0;
1988 #endif
1989
1990 #if !CMK_SMP
1991     index = queue->smsg_head_index;
1992 #else
1993     index = 0;
1994 #endif
1995 #if CMK_SMP
1996     while(index <mysize)
1997     {
1998         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1999         for (i=0; i<len; i++) 
2000         {
2001             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
2002             if (ptr == 0) break;
2003 #else
2004     while(index != -1)
2005     {
2006         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2007         pre = 0;
2008         while(ptr != 0)
2009         {
2010 #endif
2011             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2012             status = GNI_RC_ERROR_RESOURCE;
2013             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2014                 /* connection not exists yet */
2015             }
2016             else
2017             switch(ptr->tag)
2018             {
2019             case SMALL_DATA_TAG:
2020                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2021                 if(status == GNI_RC_SUCCESS)
2022                 {
2023                     CmiFree(ptr->msg);
2024                 }
2025                 break;
2026             case LMSG_INIT_TAG:
2027                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2028                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2029                 break;
2030             case   ACK_TAG:
2031             case   BIG_MSG_TAG:
2032                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2033                 if(status == GNI_RC_SUCCESS)
2034                 {
2035                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2036                 }
2037                 break;
2038 #ifdef CMK_DIRECT
2039             case DIRECT_PUT_DONE_TAG:
2040                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2041                 if(status == GNI_RC_SUCCESS)
2042                 {
2043                     free((CMK_DIRECT_HEADER*)ptr->msg);
2044                 }
2045                 break;
2046
2047 #endif
2048             default:
2049                 printf("Weird tag\n");
2050                 CmiAbort("should not happen\n");
2051             }       // end switch
2052             if(status == GNI_RC_SUCCESS)
2053             {
2054 #if !CMK_SMP
2055                 tmp_ptr = ptr;
2056                 if(pre)
2057                 {
2058                     ptr = pre ->next = ptr->next;
2059                 }else
2060                 {
2061                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2062                 }
2063                 FreeMsgList(tmp_ptr);
2064 #else
2065                 FreeMsgList(ptr);
2066 #endif
2067 #if PRINT_SYH
2068                 buffered_smsg_counter--;
2069                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2070 #endif
2071 #if CMI_EXERT_SEND_CAP
2072                 sent_cnt++;
2073                 if(sent_cnt == SEND_CAP)
2074                     break;
2075 #endif
2076             }else {
2077 #if CMK_SMP
2078                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2079 #else
2080                 pre = ptr;
2081                 ptr=ptr->next;
2082 #endif
2083                 done = 0;
2084                 if(status == GNI_RC_ERROR_RESOURCE)
2085                     break;
2086             } 
2087         } //end while
2088 #if !CMK_SMP
2089         if(ptr == 0)
2090             queue->smsg_msglist_index[index].tail = pre;
2091         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2092         {
2093             if(index_previous != -1)
2094                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2095             else
2096                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2097         }else
2098         {index_previous = index;
2099         }
2100         index = queue->smsg_msglist_index[index].next;
2101 #else
2102         index++;
2103 #endif
2104
2105 #if CMI_EXERT_SEND_CAP
2106         if(sent_cnt == SEND_CAP)
2107                 break;
2108 #endif
2109     }   // end pooling for all cores
2110     return done;
2111 }
2112
2113 static void ProcessDeadlock();
2114 void LrtsAdvanceCommunication(int whileidle)
2115 {
2116     /*  Receive Msg first */
2117 #if CMK_SMP_TRACE_COMMTHREAD
2118     double startT, endT;
2119 #endif
2120     if (useDynamicSMSG && whileidle)
2121     {
2122 #if CMK_SMP_TRACE_COMMTHREAD
2123         startT = CmiWallTimer();
2124 #endif
2125         PumpDatagramConnection();
2126 #if CMK_SMP_TRACE_COMMTHREAD
2127         endT = CmiWallTimer();
2128         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2129 #endif
2130     }
2131
2132 #if CMK_SMP_TRACE_COMMTHREAD
2133     startT = CmiWallTimer();
2134 #endif
2135     PumpNetworkSmsg();
2136     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2137 #if CMK_SMP_TRACE_COMMTHREAD
2138     endT = CmiWallTimer();
2139     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2140 #endif
2141
2142 #if CMK_SMP_TRACE_COMMTHREAD
2143     startT = CmiWallTimer();
2144 #endif
2145     PumpLocalRdmaTransactions();
2146     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2147 #if CMK_SMP_TRACE_COMMTHREAD
2148     endT = CmiWallTimer();
2149     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2150 #endif
2151     /* Send buffered Message */
2152 #if CMK_SMP_TRACE_COMMTHREAD
2153     startT = CmiWallTimer();
2154 #endif
2155 #if CMK_USE_OOB
2156     if (SendBufferMsg(&smsg_oob_queue) == 1)
2157 #endif
2158     SendBufferMsg(&smsg_queue);
2159     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2160 #if CMK_SMP_TRACE_COMMTHREAD
2161     endT = CmiWallTimer();
2162     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2163 #endif
2164
2165 #if CMK_SMP_TRACE_COMMTHREAD
2166     startT = CmiWallTimer();
2167 #endif
2168     SendRdmaMsg();
2169     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2170 #if CMK_SMP_TRACE_COMMTHREAD
2171     endT = CmiWallTimer();
2172     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2173 #endif
2174
2175 #if CMK_SMP
2176     if (_detected_hang)  ProcessDeadlock();
2177 #endif
2178 }
2179
2180 /* useDynamicSMSG */
2181 static void _init_dynamic_smsg()
2182 {
2183     gni_return_t status;
2184     uint32_t     vmdh_index = -1;
2185     int i;
2186
2187     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2188     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2189     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2190     for(i=0; i<mysize; i++) {
2191         smsg_connected_flag[i] = 0;
2192         smsg_attr_vector_local[i] = NULL;
2193         smsg_attr_vector_remote[i] = NULL;
2194     }
2195     if(mysize <=512)
2196     {
2197         SMSG_MAX_MSG = 4096;
2198     }else if (mysize <= 4096)
2199     {
2200         SMSG_MAX_MSG = 4096/mysize * 1024;
2201     }else if (mysize <= 16384)
2202     {
2203         SMSG_MAX_MSG = 512;
2204     }else {
2205         SMSG_MAX_MSG = 256;
2206     }
2207
2208     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2209     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2210     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2211     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2212     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2213
2214     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2215     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2216     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2217     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2218     mailbox_list->offset = 0;
2219     mailbox_list->next = 0;
2220     
2221     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2222         mailbox_list->size, smsg_rx_cqh,
2223         GNI_MEM_READWRITE,   
2224         vmdh_index,
2225         &(mailbox_list->mem_hndl));
2226     GNI_RC_CHECK("MEMORY registration for smsg", status);
2227
2228     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2229     GNI_RC_CHECK("Unbound EP", status);
2230     
2231     alloc_smsg_attr(&send_smsg_attr);
2232
2233     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2234     GNI_RC_CHECK("post unbound datagram", status);
2235
2236       /* always pre-connect to proc 0 */
2237     //if (myrank != 0) connect_to(0);
2238 }
2239
2240 static void _init_static_smsg()
2241 {
2242     gni_smsg_attr_t      *smsg_attr;
2243     gni_smsg_attr_t      remote_smsg_attr;
2244     gni_smsg_attr_t      *smsg_attr_vec;
2245     gni_mem_handle_t     my_smsg_mdh_mailbox;
2246     int      ret, i;
2247     gni_return_t status;
2248     uint32_t              vmdh_index = -1;
2249     mdh_addr_t            base_infor;
2250     mdh_addr_t            *base_addr_vec;
2251     char *env;
2252
2253     if(mysize <=512)
2254     {
2255         SMSG_MAX_MSG = 1024;
2256     }else if (mysize <= 4096)
2257     {
2258         SMSG_MAX_MSG = 1024;
2259     }else if (mysize <= 16384)
2260     {
2261         SMSG_MAX_MSG = 512;
2262     }else {
2263         SMSG_MAX_MSG = 256;
2264     }
2265     
2266     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2267     if (env) SMSG_MAX_MSG = atoi(env);
2268     CmiAssert(SMSG_MAX_MSG > 0);
2269
2270     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2271     
2272     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2273     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2274     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2275     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2276     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2277     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2278     CmiAssert(ret == 0);
2279     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2280     
2281     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2282             smsg_memlen*(mysize), smsg_rx_cqh,
2283             GNI_MEM_READWRITE,   
2284             vmdh_index,
2285             &my_smsg_mdh_mailbox);
2286     register_memory_size += smsg_memlen*(mysize);
2287     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2288
2289     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2290     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2291
2292     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2293     base_infor.mdh =  my_smsg_mdh_mailbox;
2294     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2295
2296     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2297  
2298     for(i=0; i<mysize; i++)
2299     {
2300         if(i==myrank)
2301             continue;
2302         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2303         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2304         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2305         smsg_attr[i].mbox_offset = i*smsg_memlen;
2306         smsg_attr[i].buff_size = smsg_memlen;
2307         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2308         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2309     }
2310
2311     for(i=0; i<mysize; i++)
2312     {
2313         if (myrank == i) continue;
2314
2315         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2316         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2317         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2318         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2319         remote_smsg_attr.buff_size = smsg_memlen;
2320         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2321         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2322
2323         /* initialize the smsg channel */
2324         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2325         GNI_RC_CHECK("SMSG Init", status);
2326     } //end initialization
2327
2328     free(base_addr_vec);
2329     free(smsg_attr);
2330
2331     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2332     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2333
2334
2335 inline
2336 static void _init_send_queue(SMSG_QUEUE *queue)
2337 {
2338      int i;
2339      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2340      for(i =0; i<mysize; i++)
2341      {
2342         queue->smsg_msglist_index[i].next = -1;
2343 #if CMK_SMP
2344         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2345 #else
2346         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2347 #endif
2348         
2349      }
2350      queue->smsg_head_index = -1;
2351 }
2352
2353 inline
2354 static void _init_smsg()
2355 {
2356     if(mysize > 1) {
2357         if (useDynamicSMSG)
2358             _init_dynamic_smsg();
2359         else
2360             _init_static_smsg();
2361     }
2362
2363     _init_send_queue(&smsg_queue);
2364 #if CMK_USE_OOB
2365     _init_send_queue(&smsg_oob_queue);
2366 #endif
2367 }
2368
2369 static void _init_static_msgq()
2370 {
2371     gni_return_t status;
2372     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2373     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2374     msgq_attrs.smsg_q_sz = 1;
2375     msgq_attrs.rcv_pool_sz = 1;
2376     msgq_attrs.num_msgq_eps = 2;
2377     msgq_attrs.nloc_insts = 8;
2378     msgq_attrs.modes = 0;
2379     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2380
2381     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2382     GNI_RC_CHECK("MSGQ Init", status);
2383
2384
2385 }
2386
2387 #if CMK_SMP && STEAL_MEMPOOL
2388 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2389 {
2390     void *pool = NULL;
2391     int i, k;
2392     // check other ranks
2393     for (k=0; k<CmiMyNodeSize()+1; k++) {
2394         i = (CmiMyRank()+k)%CmiMyNodeSize();
2395         if (i==CmiMyRank()) continue;
2396         mempool_type *mptr = CpvAccessOther(mempool, i);
2397         CmiLock(mptr->mempoolLock);
2398         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2399         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2400             CmiUnlock(mptr->mempoolLock);
2401             continue;
2402         }
2403         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2404         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2405             /* search in the free list */
2406           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2407           mempool_header *current = free_header;
2408           while (current) {
2409             if (current->next_free == (char*)header-(char*)mptr) break;
2410             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2411           }
2412           if (current == NULL) {         /* not found in free list */
2413             CmiUnlock(mptr->mempoolLock);
2414             continue;
2415           }
2416 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2417             /* search the previous memblock, and remove the tail */
2418           mempool_block *ptr = (mempool_block *)mptr;
2419           while (ptr) {
2420             if (ptr->memblock_next == mptr->memblock_tail) break;
2421             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2422           }
2423           CmiAssert(ptr!=NULL);
2424           ptr->memblock_next = 0;
2425           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2426
2427             /* remove memblock from the free list */
2428           current->next_free = header->next_free;
2429           if (header == free_header) mptr->freelist_head = header->next_free;
2430
2431           CmiUnlock(mptr->mempoolLock);
2432
2433           pool = (void*)tail;
2434           *mem_hndl = tail->mem_hndl;
2435           *size = tail->size;
2436           return pool;
2437         }
2438         CmiUnlock(mptr->mempoolLock);
2439     }
2440
2441       /* steal failed, deregister and free memblock now */
2442     int freed = 0;
2443     for (k=0; k<CmiMyNodeSize()+1; k++) {
2444         i = (CmiMyRank()+k)%CmiMyNodeSize();
2445         mempool_type *mptr = CpvAccessOther(mempool, i);
2446         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2447
2448         mempool_block *mempools_head = &(mptr->mempools_head);
2449         mempool_block *current = mempools_head;
2450         mempool_block *prev = NULL;
2451
2452         while (current) {
2453           int isfree = 0;
2454           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2455 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2456           mempool_header *cur = free_header;
2457           mempool_header *header;
2458           if (current != mempools_head) {
2459             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2460              /* search in free list */
2461             if (header->size == current->size - sizeof(mempool_block)) {
2462               cur = free_header;
2463               while (cur) {
2464                 if (cur->next_free == (char*)header-(char*)mptr) break;
2465                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2466               }
2467               if (cur != NULL) isfree = 1;
2468             }
2469           }
2470           if (isfree) {
2471               /* remove from free list */
2472             cur->next_free = header->next_free;
2473             if (header == free_header) mptr->freelist_head = header->next_free;
2474              // deregister
2475             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2476             GNI_RC_CHECK("steal Mempool de-register", status);
2477             mempool_block *ptr = current;
2478             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2479             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2480 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2481             freed += ptr->size;
2482             free(ptr);
2483              // try now
2484             if (freed > *size) {
2485               if (pool == NULL) {
2486                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2487                 CmiAssert(ret == 0);
2488               }
2489               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2490               if (status == GNI_RC_SUCCESS) {
2491                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2492 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2493                 return pool;
2494               }
2495 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2496             }
2497           }
2498           else {
2499              prev = current;
2500              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2501           }
2502         }
2503
2504         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2505     }
2506       /* still no luck registering pool */
2507     if (pool) free(pool);
2508     return NULL;
2509 }
2510 #endif
2511
2512 static long long int total_mempool_size = 0;
2513 static long long int total_mempool_calls = 0;
2514
2515 #if USE_LRTS_MEMPOOL
2516 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2517 {
2518     void *pool;
2519     int ret;
2520
2521     int default_size =  expand_flag? _expand_mem : _mempool_size;
2522     if (*size < default_size) *size = default_size;
2523     total_mempool_size += *size;
2524     total_mempool_calls += 1;
2525     if (*size > MAX_REG_MEM) {
2526         printf("Error: A mempool block with size %d is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX.", *size);
2527         CmiAbort("alloc_mempool_block");
2528     }
2529     ret = posix_memalign(&pool, ALIGNBUF, *size);
2530     if (ret != 0) {
2531 #if CMK_SMP && STEAL_MEMPOOL
2532       pool = steal_mempool_block(size, mem_hndl);
2533       if (pool != NULL) return pool;
2534 #endif
2535       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2536       if (ret == ENOMEM)
2537         CmiAbort("alloc_mempool_block: out of memory.");
2538       else
2539         CmiAbort("alloc_mempool_block: posix_memalign failed");
2540     }
2541     SetMemHndlZero((*mem_hndl));
2542     
2543     return pool;
2544 }
2545
2546 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2547 {
2548     if(!(IsMemHndlZero(mem_hndl)))
2549     {
2550         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2551         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2552     }
2553     free(ptr);
2554 }
2555 #endif
2556
2557 void LrtsPreCommonInit(int everReturn){
2558 #if USE_LRTS_MEMPOOL
2559     CpvInitialize(mempool_type*, mempool);
2560     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2561     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2562 #endif
2563 }
2564
2565
2566 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2567 {
2568     register int            i;
2569     int                     rc;
2570     int                     device_id = 0;
2571     unsigned int            remote_addr;
2572     gni_cdm_handle_t        cdm_hndl;
2573     gni_return_t            status = GNI_RC_SUCCESS;
2574     uint32_t                vmdh_index = -1;
2575     uint8_t                 ptag;
2576     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2577     int                     first_spawned;
2578     int                     physicalID;
2579     char                   *env;
2580
2581     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2582     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2583     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2584    
2585     status = PMI_Init(&first_spawned);
2586     GNI_RC_CHECK("PMI_Init", status);
2587
2588     status = PMI_Get_size(&mysize);
2589     GNI_RC_CHECK("PMI_Getsize", status);
2590
2591     status = PMI_Get_rank(&myrank);
2592     GNI_RC_CHECK("PMI_getrank", status);
2593
2594     //physicalID = CmiPhysicalNodeID(myrank);
2595     
2596     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2597
2598     *myNodeID = myrank;
2599     *numNodes = mysize;
2600   
2601 #if MULTI_THREAD_SEND
2602     /* Currently, we only consider the case that comm. thread will only recv msgs */
2603     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2604 #endif
2605
2606     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2607     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2608     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2609
2610     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2611     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2612     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2613
2614     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2615     if (env) useDynamicSMSG = 1;
2616     if (!useDynamicSMSG)
2617       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2618     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2619     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2620     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2621     
2622     if(myrank == 0)
2623     {
2624         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2625         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2626     }
2627 #ifdef USE_ONESIDED
2628     onesided_init(NULL, &onesided_hnd);
2629
2630     // this is a GNI test, so use the libonesided bypass functionality
2631     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2632     local_addr = gniGetNicAddress();
2633 #else
2634     ptag = get_ptag();
2635     cookie = get_cookie();
2636 #if 0
2637     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2638 #endif
2639     //Create and attach to the communication  domain */
2640     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2641     GNI_RC_CHECK("GNI_CdmCreate", status);
2642     //* device id The device id is the minor number for the device
2643     //that is assigned to the device by the system when the device is created.
2644     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2645     //where X is the device number 0 default 
2646     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2647     GNI_RC_CHECK("GNI_CdmAttach", status);
2648     local_addr = get_gni_nic_address(0);
2649 #endif
2650     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2651     _MEMCHECK(MPID_UGNI_AllAddr);
2652     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2653     /* create the local completion queue */
2654     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2655     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2656     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2657     
2658     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2659     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2660     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2661
2662     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2663     GNI_RC_CHECK("Create CQ (rx)", status);
2664     
2665     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2666     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2667     
2668     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2669     //GNI_RC_CHECK("Create BTE CQ", status);
2670
2671     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2672     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2673     _MEMCHECK(ep_hndl_array);
2674 #if CMK_SMP && !COMM_THREAD_SEND
2675     tx_cq_lock  = CmiCreateLock();
2676     rx_cq_lock  = CmiCreateLock();
2677 #endif
2678     for (i=0; i<mysize; i++) {
2679         if(i == myrank) continue;
2680         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2681         GNI_RC_CHECK("GNI_EpCreate ", status);   
2682         remote_addr = MPID_UGNI_AllAddr[i];
2683         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2684         GNI_RC_CHECK("GNI_EpBind ", status);   
2685     }
2686
2687     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2688     _init_smsg();
2689     PMI_Barrier();
2690
2691 #if     USE_LRTS_MEMPOOL
2692     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2693     if (env) _mempool_size = CmiReadSize(env);
2694     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2695         _mempool_size = CmiReadSize(env);
2696
2697
2698     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2699     if (env) MAX_REG_MEM = CmiReadSize(env);
2700     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) 
2701         MAX_REG_MEM = CmiReadSize(env);
2702
2703     env = getenv("CHARM_UGNI_SEND_MAX");
2704     if (env) MAX_BUFF_SEND = CmiReadSize(env);
2705     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) 
2706         MAX_BUFF_SEND = CmiReadSize(env);
2707
2708     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2709     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2710
2711     if (myrank==0) {
2712         printf("Charm++> memory pool init size: %1.fMB\n", _mempool_size/1024.0/1024);
2713         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2714         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2715             /* memblock can expand to BIG_MSG * 2 size */
2716             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2717             CmiAbort("mempool maximum size is too small. \n");
2718         }
2719 #if CMK_SMP && MULTI_THREAD_SEND
2720         printf("Charm++> worker thread sending messages\n");
2721 #elif CMK_SMP && COMM_THREAD_SEND
2722         printf("Charm++> only comm thread send/recv messages\n");
2723 #endif
2724     }
2725 #endif
2726
2727     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2728     if (env) {
2729         BIG_MSG = CmiReadSize(env);
2730         if (BIG_MSG < ONE_SEG)
2731           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2732     }
2733     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2734     if (env) {
2735         BIG_MSG_PIPELINE = atoi(env);
2736     }
2737
2738     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2739     if (env) _checkProgress = 0;
2740     if (mysize == 1) _checkProgress = 0;
2741
2742     /* init DMA buffer for medium message */
2743
2744     //_init_DMA_buffer();
2745     
2746     free(MPID_UGNI_AllAddr);
2747 #if CMK_SMP
2748     sendRdmaBuf = PCQueueCreate();
2749 #else
2750     sendRdmaBuf = 0;
2751 #endif
2752 #if MACHINE_DEBUG_LOG
2753     char ln[200];
2754     sprintf(ln,"debugLog.%d",myrank);
2755     debugLog=fopen(ln,"w");
2756 #endif
2757
2758 }
2759
2760 void* LrtsAlloc(int n_bytes, int header)
2761 {
2762     void *ptr;
2763 #if 0
2764     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2765 #endif
2766     if(n_bytes <= SMSG_MAX_MSG)
2767     {
2768         int totalsize = n_bytes+header;
2769         ptr = malloc(totalsize);
2770     }
2771     else {
2772         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2773 #if     USE_LRTS_MEMPOOL
2774         n_bytes = ALIGN64(n_bytes);
2775         if(n_bytes < BIG_MSG)
2776         {
2777             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2778             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2779         }else 
2780         {
2781             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2782             ptr = res + ALIGNBUF - header;
2783
2784         }
2785 #else
2786         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2787         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2788         ptr = res + ALIGNBUF - header;
2789 #endif
2790     }
2791     return ptr;
2792 }
2793
2794 void  LrtsFree(void *msg)
2795 {
2796     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2797     if (size <= SMSG_MAX_MSG)
2798       free(msg);
2799     else if(size>=BIG_MSG)
2800     {
2801         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2802
2803     }else
2804     {
2805 #if     USE_LRTS_MEMPOOL
2806 #if CMK_SMP
2807         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2808 #else
2809         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2810 #endif
2811 #else
2812         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2813 #endif
2814     }
2815 }
2816
2817 void LrtsExit()
2818 {
2819     /* free memory ? */
2820 #if USE_LRTS_MEMPOOL
2821     mempool_destroy(CpvAccess(mempool));
2822 #endif
2823     PMI_Finalize();
2824     exit(0);
2825 }
2826
2827 void LrtsDrainResources()
2828 {
2829     if(mysize == 1) return;
2830     while (
2831 #if CMK_USE_OOB
2832            !SendBufferMsg(&smsg_oob_queue) ||
2833 #endif
2834            !SendBufferMsg(&smsg_queue))
2835     {
2836         if (useDynamicSMSG)
2837             PumpDatagramConnection();
2838         PumpNetworkSmsg();
2839         PumpLocalRdmaTransactions();
2840         SendRdmaMsg();
2841     }
2842     PMI_Barrier();
2843 }
2844
2845 void LrtsAbort(const char *message) {
2846     printf("CmiAbort is calling on PE:%d\n", myrank);
2847     CmiPrintStackTrace(0);
2848     PMI_Abort(-1, message);
2849 }
2850
2851 /**************************  TIMER FUNCTIONS **************************/
2852 #if CMK_TIMER_USE_SPECIAL
2853 /* MPI calls are not threadsafe, even the timer on some machines */
2854 static CmiNodeLock  timerLock = 0;
2855 static int _absoluteTime = 0;
2856 static int _is_global = 0;
2857 static struct timespec start_ts;
2858
2859 inline int CmiTimerIsSynchronized() {
2860     return 0;
2861 }
2862
2863 inline int CmiTimerAbsolute() {
2864     return _absoluteTime;
2865 }
2866
2867 double CmiStartTimer() {
2868     return 0.0;
2869 }
2870
2871 double CmiInitTime() {
2872     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2873 }
2874
2875 void CmiTimerInit(char **argv) {
2876     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2877     if (_absoluteTime && CmiMyPe() == 0)
2878         printf("Charm++> absolute  timer is used\n");
2879     
2880     _is_global = CmiTimerIsSynchronized();
2881
2882
2883     if (_is_global) {
2884         if (CmiMyRank() == 0) {
2885             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2886         }
2887     } else { /* we don't have a synchronous timer, set our own start time */
2888         CmiBarrier();
2889         CmiBarrier();
2890         CmiBarrier();
2891         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2892     }
2893     CmiNodeAllBarrier();          /* for smp */
2894 }
2895
2896 /**
2897  * Since the timerLock is never created, and is
2898  * always NULL, then all the if-condition inside
2899  * the timer functions could be disabled right
2900  * now in the case of SMP.
2901  */
2902 double CmiTimer(void) {
2903     struct timespec now_ts;
2904     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2905     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2906         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2907 }
2908
2909 double CmiWallTimer(void) {
2910     struct timespec now_ts;
2911     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2912     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2913         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2914 }
2915
2916 double CmiCpuTimer(void) {
2917     struct timespec now_ts;
2918     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2919     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2920         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2921 }
2922
2923 #endif
2924 /************Barrier Related Functions****************/
2925
2926 int CmiBarrier()
2927 {
2928     gni_return_t status;
2929
2930 #if CMK_SMP
2931     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2932     CmiNodeAllBarrier();
2933     if (CmiMyRank() == CmiMyNodeSize())
2934 #else
2935     if (CmiMyRank() == 0)
2936 #endif
2937     {
2938         /**
2939          *  The call of CmiBarrier is usually before the initialization
2940          *  of trace module of Charm++, therefore, the START_EVENT
2941          *  and END_EVENT are disabled here. -Chao Mei
2942          */
2943         /*START_EVENT();*/
2944         status = PMI_Barrier();
2945         GNI_RC_CHECK("PMI_Barrier", status);
2946         /*END_EVENT(10);*/
2947     }
2948     CmiNodeAllBarrier();
2949     return status;
2950
2951 }
2952 #if CMK_DIRECT
2953 #include "machine-cmidirect.c"
2954 #endif
2955 #if CMK_PERSISTENT_COMM
2956 #include "machine-persistent.c"
2957 #endif
2958
2959