friendly warning message for completion queue full
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
21     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
22     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
23     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
24  */
25 /*@{*/
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <stdint.h>
30 #include <errno.h>
31 #include <malloc.h>
32 #include <unistd.h>
33 #include <time.h>
34
35 #include <gni_pub.h>
36 #include <pmi.h>
37
38 #include "converse.h"
39
40 #define PRINT_SYH  0
41
42 #define USE_LRTS_MEMPOOL                  1
43 #define REMOTE_EVENT                      0
44
45 #define CMI_EXERT_SEND_CAP      0
46 #define CMI_EXERT_RECV_CAP      0
47
48 #if CMI_EXERT_SEND_CAP
49 #define SEND_CAP 16
50 #endif
51
52 #if CMI_EXERT_RECV_CAP
53 #define RECV_CAP 2
54 #endif
55
56 #if CMK_SMP
57 #define COMM_THREAD_SEND 1
58 //#define MULTI_THREAD_SEND 1
59 #endif
60
61 // Trace communication thread
62 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
63 #define TRACE_THRESHOLD     0.00005
64 #define CMI_MPI_TRACE_MOREDETAILED 0
65 #undef CMI_MPI_TRACE_USEREVENTS
66 #define CMI_MPI_TRACE_USEREVENTS 1
67 #else
68 #undef CMK_SMP_TRACE_COMMTHREAD
69 #define CMK_SMP_TRACE_COMMTHREAD 0
70 #endif
71
72 #define CMK_TRACE_COMMOVERHEAD 0
73 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
74 #undef CMI_MPI_TRACE_USEREVENTS
75 #define CMI_MPI_TRACE_USEREVENTS 1
76 #else
77 #undef CMK_TRACE_COMMOVERHEAD
78 #define CMK_TRACE_COMMOVERHEAD 0
79 #endif
80
81 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
82 CpvStaticDeclare(double, projTraceStart);
83 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
84 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
85 #else
86 #define  START_EVENT()
87 #define  END_EVENT(x)
88 #endif
89
90 #if USE_LRTS_MEMPOOL
91
92 #define oneMB (1024ll*1024)
93 #define oneGB (1024ll*1024*1024)
94
95 static CmiInt8 _mempool_size = 8*oneMB;
96 static CmiInt8 _expand_mem =  4*oneMB;
97
98 static CmiInt8 _totalmem = 0.8*oneGB;
99
100 static int BIG_MSG  =  4*oneMB;
101 static int ONE_SEG  =  2*oneMB;
102 static int BIG_MSG_PIPELINE = 4;
103
104 // dynamic flow control
105 static CmiInt8 buffered_send_msg = 0;
106 static int   register_memory_size = 0;
107
108 #if CMK_SMP && COMM_THREAD_SEND 
109 //Dynamic flow control about memory registration
110 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
111 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
112 #else
113 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
114 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
115 #endif
116
117 #endif     /* end USE_LRTS_MEMPOOL */
118
119 #if CMK_SMP && MULTI_THREAD_SEND
120 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
121 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
122 #else
123 #define     CMI_GNI_LOCK
124 #define     CMI_GNI_UNLOCK
125 #endif
126
127 static int   user_set_flag  = 0;
128
129 static int _checkProgress = 1;             /* check deadlock */
130 static int _detected_hang = 0;
131
132 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
133
134 // dynamic SMSG
135 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
136
137 static int avg_smsg_connection = 32;
138 static int                 *smsg_connected_flag= 0;
139 static gni_smsg_attr_t     **smsg_attr_vector_local;
140 static gni_smsg_attr_t     **smsg_attr_vector_remote;
141 static gni_ep_handle_t     ep_hndl_unbound;
142 static gni_smsg_attr_t     send_smsg_attr;
143 static gni_smsg_attr_t     recv_smsg_attr;
144
145 typedef struct _dynamic_smsg_mailbox{
146    void     *mailbox_base;
147    int      size;
148    int      offset;
149    gni_mem_handle_t  mem_hndl;
150    struct      _dynamic_smsg_mailbox  *next;
151 }dynamic_smsg_mailbox_t;
152
153 static dynamic_smsg_mailbox_t  *mailbox_list;
154
155 int         rdma_id = 0;
156
157 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
158 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
159
160 #if PRINT_SYH
161 int         lrts_send_msg_id = 0;
162 int         lrts_local_done_msg = 0;
163 int         lrts_send_rdma_success = 0;
164 #endif
165
166 #include "machine.h"
167
168 #include "pcqueue.h"
169
170 #include "mempool.h"
171
172 #if CMK_PERSISTENT_COMM
173 #include "machine-persistent.h"
174 #endif
175
176 //#define  USE_ONESIDED 1
177 #ifdef USE_ONESIDED
178 //onesided implementation is wrong, since no place to restore omdh
179 #include "onesided.h"
180 onesided_hnd_t   onesided_hnd;
181 onesided_md_t    omdh;
182 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
183
184 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
185
186 #else
187 uint8_t   onesided_hnd, omdh;
188 #if REMOTE_EVENT
189 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
190          status = GNI_RC_ERROR_NOMEM;} \
191         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
192                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
193 #else
194 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
195     do {   \
196         if (register_memory_size + size >= MAX_REG_MEM) { \
197             status = GNI_RC_ERROR_NOMEM; \
198         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
199             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
200     } while(0)
201 #endif
202 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
203     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
204              register_memory_size -= size; \
205          else CmiAbort("MEM_DEregister");  \
206     } while (0)
207 #endif
208
209 #define   GetMempoolBlockPtr(x)  (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
210 #define   IncreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)++
211 #define   DecreaseMsgInRecv(x)   (GetMempoolBlockPtr(x)->msgs_in_recv)--
212 #define   IncreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)++
213 #define   DecreaseMsgInSend(x)   (GetMempoolBlockPtr(x)->msgs_in_send)--
214 #define   GetMempoolPtr(x)        GetMempoolBlockPtr(x)->mptr
215 #define   GetMempoolsize(x)       GetMempoolBlockPtr(x)->size
216 #define   GetMemHndl(x)           GetMempoolBlockPtr(x)->mem_hndl
217 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
218 #define   GetSizeFromHeader(x)    ((block_header*)x)->size
219 #define   NoMsgInSend(x)          GetMempoolBlockPtr(x)->msgs_in_send == 0
220 #define   NoMsgInRecv(x)          GetMempoolBlockPtr(x)->msgs_in_recv == 0
221 #define   NoMsgInFlight(x)        (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv  == 0)
222 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
223 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
224 #define   NotRegistered(x)        IsMemHndlZero(((block_header*)x)->mem_hndl)
225
226 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
227 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
228 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
229 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
230
231 #define ALIGNBUF                64
232
233 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
234 /* If SMSG is not used */
235
236 #define FMA_PER_CORE  1024
237 #define FMA_BUFFER_SIZE 1024
238
239 /* If SMSG is used */
240 static int  SMSG_MAX_MSG = 1024;
241 #define SMSG_MAX_CREDIT 72 
242
243 #define MSGQ_MAXSIZE       2048
244 /* large message transfer with FMA or BTE */
245 #define LRTS_GNI_RDMA_THRESHOLD  1024 
246
247 #if CMK_SMP
248 static int  REMOTE_QUEUE_ENTRIES=163840; 
249 static int LOCAL_QUEUE_ENTRIES=163840; 
250 #else
251 static int  REMOTE_QUEUE_ENTRIES=20480;
252 static int LOCAL_QUEUE_ENTRIES=20480; 
253 #endif
254
255 #define BIG_MSG_TAG  0x26
256 #define PUT_DONE_TAG      0x28
257 #define DIRECT_PUT_DONE_TAG      0x29
258 #define ACK_TAG           0x30
259 /* SMSG is data message */
260 #define SMALL_DATA_TAG          0x31
261 /* SMSG is a control message to initialize a BTE */
262 #define MEDIUM_HEAD_TAG         0x32
263 #define MEDIUM_DATA_TAG         0x33
264 #define LMSG_INIT_TAG           0x39 
265 #define VERY_LMSG_INIT_TAG      0x40 
266 #define VERY_LMSG_TAG           0x41 
267
268 #define DEBUG
269 #ifdef GNI_RC_CHECK
270 #undef GNI_RC_CHECK
271 #endif
272 #ifdef DEBUG
273 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
274 #else
275 #define GNI_RC_CHECK(msg,rc)
276 #endif
277
278 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
279 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
280
281 static int useStaticMSGQ = 0;
282 static int useStaticFMA = 0;
283 static int mysize, myrank;
284 gni_nic_handle_t      nic_hndl;
285
286 typedef struct {
287     gni_mem_handle_t mdh;
288     uint64_t addr;
289 } mdh_addr_t ;
290 // this is related to dynamic SMSG
291
292 typedef struct mdh_addr_list{
293     gni_mem_handle_t mdh;
294    void *addr;
295     struct mdh_addr_list *next;
296 }mdh_addr_list_t;
297
298 static unsigned int         smsg_memlen;
299 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
300 mdh_addr_t          setup_mem;
301 mdh_addr_t          *smsg_connection_vec = 0;
302 gni_mem_handle_t    smsg_connection_memhndl;
303 static int          smsg_expand_slots = 10;
304 static int          smsg_available_slot = 0;
305 static void         *smsg_mailbox_mempool = 0;
306 mdh_addr_list_t     *smsg_dynamic_list = 0;
307
308 static void             *smsg_mailbox_base;
309 gni_msgq_attr_t         msgq_attrs;
310 gni_msgq_handle_t       msgq_handle;
311 gni_msgq_ep_attr_t      msgq_ep_attrs;
312 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
313
314 /* =====Beginning of Declarations of Machine Specific Variables===== */
315 static int cookie;
316 static int modes = 0;
317 static gni_cq_handle_t       smsg_rx_cqh = NULL;
318 static gni_cq_handle_t       smsg_tx_cqh = NULL;
319 static gni_cq_handle_t       post_rx_cqh = NULL;
320 static gni_cq_handle_t       post_tx_cqh = NULL;
321 static gni_ep_handle_t       *ep_hndl_array;
322 #if CMK_SMP && MULTI_THREAD_SEND
323 static CmiNodeLock           *ep_lock_array;
324 static CmiNodeLock           tx_cq_lock; 
325 static CmiNodeLock           rx_cq_lock;
326 static CmiNodeLock           *mempool_lock;
327 #endif
328
329 typedef struct msg_list
330 {
331     uint32_t destNode;
332     uint32_t size;
333     void *msg;
334     uint8_t tag;
335 #if !CMK_SMP
336     struct msg_list *next;
337 #endif
338 }MSG_LIST;
339
340
341 typedef struct control_msg
342 {
343     uint64_t            source_addr;    /* address from the start of buffer  */
344     uint64_t            dest_addr;      /* address from the start of buffer */
345     //int                 source;         /* source rank */
346     int                 total_length;   /* total length */
347     int                 length;         /* length of this packet */
348     uint8_t             seq_id;         //big message   0 meaning single message
349     gni_mem_handle_t    source_mem_hndl;
350     struct control_msg *next;
351 }CONTROL_MSG;
352
353 #ifdef CMK_DIRECT
354 typedef struct{
355     void *recverBuf;
356     void (*callbackFnPtr)(void *);
357     void *callbackData;
358 }CMK_DIRECT_HEADER;
359 #endif
360 typedef struct  rmda_msg
361 {
362     int                   destNode;
363     gni_post_descriptor_t *pd;
364 #if !CMK_SMP
365     struct  rmda_msg      *next;
366 #endif
367 }RDMA_REQUEST;
368
369 #if CMK_SMP
370 PCQueue sendRdmaBuf;
371 typedef struct  msg_list_index
372 {
373     int         next;
374     PCQueue     sendSmsgBuf;
375 } MSG_LIST_INDEX;
376 #else
377 static RDMA_REQUEST        *sendRdmaBuf = 0;
378 static RDMA_REQUEST        *sendRdmaTail = 0;
379 typedef struct  msg_list_index
380 {
381     int         next;
382     MSG_LIST    *sendSmsgBuf;
383     MSG_LIST    *tail;
384 } MSG_LIST_INDEX;
385 #endif
386
387 /* reuse PendingMsg memory */
388 static CONTROL_MSG          *control_freelist=0;
389 static MSG_LIST             *msglist_freelist=0;
390
391 // buffered send queue
392 typedef struct smsg_queue
393 {
394     MSG_LIST_INDEX   *smsg_msglist_index;
395     int               smsg_head_index;
396 } SMSG_QUEUE;
397
398 SMSG_QUEUE                  smsg_queue;
399 SMSG_QUEUE                  smsg_oob_queue;
400
401 #if CMK_SMP
402
403 #define FreeMsgList(d)   free(d);
404 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
405
406 #else
407
408 #define FreeMsgList(d)  \
409   (d)->next = msglist_freelist;\
410   msglist_freelist = d;
411
412 #define MallocMsgList(d) \
413   d = msglist_freelist;\
414   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
415              _MEMCHECK(d);\
416   } else msglist_freelist = d->next; \
417   d->next =0;
418 #endif
419
420 #if CMK_SMP
421
422 #define FreeControlMsg(d)      free(d);
423 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
424
425 #else
426
427 #define FreeControlMsg(d)       \
428   (d)->next = control_freelist;\
429   control_freelist = d;
430
431 #define MallocControlMsg(d) \
432   d = control_freelist;\
433   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
434              _MEMCHECK(d);\
435   } else control_freelist = d->next;
436
437 #endif
438
439 static RDMA_REQUEST         *rdma_freelist = NULL;
440
441 #define FreeMediumControlMsg(d)       \
442   (d)->next = medium_control_freelist;\
443   medium_control_freelist = d;
444
445
446 #define MallocMediumControlMsg(d) \
447     d = medium_control_freelist;\
448     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
449     _MEMCHECK(d);\
450 } else mediumcontrol_freelist = d->next;
451
452 # if CMK_SMP
453 #define FreeRdmaRequest(d)       free(d);
454 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
455 #else
456
457 #define FreeRdmaRequest(d)       \
458   (d)->next = rdma_freelist;\
459   rdma_freelist = d;
460
461 #define MallocRdmaRequest(d) \
462   d = rdma_freelist;\
463   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
464              _MEMCHECK(d);\
465   } else rdma_freelist = d->next; \
466     d->next =0;
467 #endif
468
469 /* reuse gni_post_descriptor_t */
470 static gni_post_descriptor_t *post_freelist=0;
471
472 #if  !CMK_SMP
473 #define FreePostDesc(d)       \
474     (d)->next_descr = post_freelist;\
475     post_freelist = d;
476
477 #define MallocPostDesc(d) \
478   d = post_freelist;\
479   if (d==0) { \
480      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
481      _MEMCHECK(d);\
482   } else post_freelist = d->next_descr;
483 #else
484
485 #define FreePostDesc(d)     free(d);
486 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
487
488 #endif
489
490
491 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
492 static int      buffered_smsg_counter = 0;
493
494 /* SmsgSend return success but message sent is not confirmed by remote side */
495 static MSG_LIST *buffered_fma_head = 0;
496 static MSG_LIST *buffered_fma_tail = 0;
497
498 /* functions  */
499 #define IsFree(a,ind)  !( a& (1<<(ind) ))
500 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
501 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
502
503 CpvDeclare(mempool_type*, mempool);
504
505 /* get the upper bound of log 2 */
506 int mylog2(int size)
507 {
508     int op = size;
509     unsigned int ret=0;
510     unsigned int mask = 0;
511     int i;
512     while(op>0)
513     {
514         op = op >> 1;
515         ret++;
516
517     }
518     for(i=1; i<ret; i++)
519     {
520         mask = mask << 1;
521         mask +=1;
522     }
523
524     ret -= ((size &mask) ? 0:1);
525     return ret;
526 }
527
528 static void
529 allgather(void *in,void *out, int len)
530 {
531     static int *ivec_ptr=NULL,already_called=0,job_size=0;
532     int i,rc;
533     int my_rank;
534     char *tmp_buf,*out_ptr;
535
536     if(!already_called) {
537
538         rc = PMI_Get_size(&job_size);
539         CmiAssert(rc == PMI_SUCCESS);
540         rc = PMI_Get_rank(&my_rank);
541         CmiAssert(rc == PMI_SUCCESS);
542
543         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
544         CmiAssert(ivec_ptr != NULL);
545
546         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
547         CmiAssert(rc == PMI_SUCCESS);
548
549         already_called = 1;
550
551     }
552
553     tmp_buf = (char *)malloc(job_size * len);
554     CmiAssert(tmp_buf);
555
556     rc = PMI_Allgather(in,tmp_buf,len);
557     CmiAssert(rc == PMI_SUCCESS);
558
559     out_ptr = out;
560
561     for(i=0;i<job_size;i++) {
562
563         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
564
565     }
566
567     free(tmp_buf);
568 }
569
570 static void
571 allgather_2(void *in,void *out, int len)
572 {
573     //PMI_Allgather is out of order
574     int i,rc, extend_len;
575     int  rank_index;
576     char *out_ptr, *out_ref;
577     char *in2;
578
579     extend_len = sizeof(int) + len;
580     in2 = (char*)malloc(extend_len);
581
582     memcpy(in2, &myrank, sizeof(int));
583     memcpy(in2+sizeof(int), in, len);
584
585     out_ptr = (char*)malloc(mysize*extend_len);
586
587     rc = PMI_Allgather(in2, out_ptr, extend_len);
588     GNI_RC_CHECK("allgather", rc);
589
590     out_ref = out;
591
592     for(i=0;i<mysize;i++) {
593         //rank index 
594         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
595         //copy to the rank index slot
596         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
597     }
598
599     free(out_ptr);
600     free(in2);
601
602 }
603
604 static unsigned int get_gni_nic_address(int device_id)
605 {
606     unsigned int address, cpu_id;
607     gni_return_t status;
608     int i, alps_dev_id=-1,alps_address=-1;
609     char *token, *p_ptr;
610
611     p_ptr = getenv("PMI_GNI_DEV_ID");
612     if (!p_ptr) {
613         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
614        
615         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
616     } else {
617         while ((token = strtok(p_ptr,":")) != NULL) {
618             alps_dev_id = atoi(token);
619             if (alps_dev_id == device_id) {
620                 break;
621             }
622             p_ptr = NULL;
623         }
624         CmiAssert(alps_dev_id != -1);
625         p_ptr = getenv("PMI_GNI_LOC_ADDR");
626         CmiAssert(p_ptr != NULL);
627         i = 0;
628         while ((token = strtok(p_ptr,":")) != NULL) {
629             if (i == alps_dev_id) {
630                 alps_address = atoi(token);
631                 break;
632             }
633             p_ptr = NULL;
634             ++i;
635         }
636         CmiAssert(alps_address != -1);
637         address = alps_address;
638     }
639     return address;
640 }
641
642 static uint8_t get_ptag(void)
643 {
644     char *p_ptr, *token;
645     uint8_t ptag;
646
647     p_ptr = getenv("PMI_GNI_PTAG");
648     CmiAssert(p_ptr != NULL);
649     token = strtok(p_ptr, ":");
650     ptag = (uint8_t)atoi(token);
651     return ptag;
652         
653 }
654
655 static uint32_t get_cookie(void)
656 {
657     uint32_t cookie;
658     char *p_ptr, *token;
659
660     p_ptr = getenv("PMI_GNI_COOKIE");
661     CmiAssert(p_ptr != NULL);
662     token = strtok(p_ptr, ":");
663     cookie = (uint32_t)atoi(token);
664
665     return cookie;
666 }
667
668 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
669 /* TODO: add any that are related */
670 /* =====End of Definitions of Message-Corruption Related Macros=====*/
671
672
673 #include "machine-lrts.h"
674 #include "machine-common-core.c"
675
676 /* Network progress function is used to poll the network when for
677    messages. This flushes receive buffers on some  implementations*/
678 #if CMK_MACHINE_PROGRESS_DEFINED
679 void CmiMachineProgressImpl() {
680 }
681 #endif
682
683 static void SendRdmaMsg();
684 static void PumpNetworkSmsg();
685 static void PumpLocalRdmaTransactions();
686 static int SendBufferMsg(SMSG_QUEUE *queue);
687
688 #if MACHINE_DEBUG_LOG
689 FILE *debugLog = NULL;
690 static CmiInt8 buffered_recv_msg = 0;
691 int         lrts_smsg_success = 0;
692 int         lrts_received_msg = 0;
693 #endif
694
695 static void sweep_mempool(mempool_type *mptr)
696 {
697     block_header *current = &(mptr->block_head);
698
699     printf("[n %d] sweep_mempool slot START.\n", myrank);
700     while( current!= NULL) {
701         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
702         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
703     }
704     printf("[n %d] sweep_mempool slot END.\n", myrank);
705 }
706
707 inline
708 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
709 {
710     block_header *current = *from;
711
712     //while(register_memory_size>= MAX_REG_MEM)
713     //{
714         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
715             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
716
717         *from = current;
718         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
719         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
720         SetMemHndlZero(GetMemHndlFromHeader(current));
721     //}
722     return GNI_RC_SUCCESS;
723 }
724
725 inline 
726 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
727 {
728     gni_return_t status = GNI_RC_SUCCESS;
729     //int size = GetMempoolsize(msg);
730     //void *blockaddr = GetMempoolBlockPtr(msg);
731     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
732    
733     block_header *current = &(mptr->block_head);
734     while(register_memory_size>= MAX_REG_MEM)
735     {
736         status = deregisterMemory(mptr, &current);
737         if (status != GNI_RC_SUCCESS) break;
738     }
739     if(register_memory_size>= MAX_REG_MEM) return status;
740
741     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
742     while(1)
743     {
744         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
745         if(status == GNI_RC_SUCCESS)
746         {
747             break;
748         }
749         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
750         {
751             CmiAbort("Memory registor for mempool fails\n");
752         }
753         else
754         {
755             status = deregisterMemory(mptr, &current);
756             if (status != GNI_RC_SUCCESS) break;
757         }
758     }; 
759     return status;
760 }
761
762 inline 
763 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
764 {
765     static int rank = -1;
766     int i;
767     gni_return_t status;
768     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
769     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
770     mempool_type *mptr;
771
772     status = registerFromMempool(mptr1, msg, size, t);
773     if (status == GNI_RC_SUCCESS) return status;
774 #if CMK_SMP 
775     for (i=0; i<CmiMyNodeSize()+1; i++) {
776       rank = (rank+1)%(CmiMyNodeSize()+1);
777       mptr = CpvAccessOther(mempool, rank);
778       if (mptr == mptr1) continue;
779       status = registerFromMempool(mptr, msg, size, t);
780       if (status == GNI_RC_SUCCESS) return status;
781     }
782 #endif
783     return  GNI_RC_ERROR_RESOURCE;
784 }
785
786 inline
787 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
788 {
789     MSG_LIST        *msg_tmp;
790     MallocMsgList(msg_tmp);
791     msg_tmp->destNode = destNode;
792     msg_tmp->size   = size;
793     msg_tmp->msg    = msg;
794     msg_tmp->tag    = tag;
795
796 #if !CMK_SMP
797     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
798         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
799         queue->smsg_head_index = destNode;
800         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
801     }else
802     {
803         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
804         queue->smsg_msglist_index[destNode].tail = msg_tmp;
805     }
806 #else
807     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
808 #endif
809 #if PRINT_SYH
810     buffered_smsg_counter++;
811 #endif
812 }
813
814 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
815 {
816     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
817 }
818
819 inline
820 static void setup_smsg_connection(int destNode)
821 {
822     mdh_addr_list_t  *new_entry = 0;
823     gni_post_descriptor_t *pd;
824     gni_smsg_attr_t      *smsg_attr;
825     gni_return_t status = GNI_RC_NOT_DONE;
826     RDMA_REQUEST        *rdma_request_msg;
827     
828     if(smsg_available_slot == smsg_expand_slots)
829     {
830         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
831         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
832         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
833
834         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
835             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
836             GNI_MEM_READWRITE,   
837             -1,
838             &(new_entry->mdh));
839         smsg_available_slot = 0; 
840         new_entry->next = smsg_dynamic_list;
841         smsg_dynamic_list = new_entry;
842     }
843     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
844     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
845     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
846     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
847     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
848     smsg_attr->buff_size = smsg_memlen;
849     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
850     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
851     smsg_local_attr_vec[destNode] = smsg_attr;
852     smsg_available_slot++;
853     MallocPostDesc(pd);
854     pd->type            = GNI_POST_FMA_PUT;
855     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
856     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
857     pd->length          = sizeof(gni_smsg_attr_t);
858     pd->local_addr      = (uint64_t) smsg_attr;
859     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
860     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
861     pd->src_cq_hndl     = 0;
862     pd->rdma_mode       = 0;
863     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
864     print_smsg_attr(smsg_attr);
865     if(status == GNI_RC_ERROR_RESOURCE )
866     {
867         MallocRdmaRequest(rdma_request_msg);
868         rdma_request_msg->destNode = destNode;
869         rdma_request_msg->pd = pd;
870         /* buffer this request */
871     }
872 #if PRINT_SYH
873     if(status != GNI_RC_SUCCESS)
874        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
875     else
876         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
877 #endif
878 }
879
880 /* useDynamicSMSG */
881 inline 
882 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
883 {
884     gni_return_t status = GNI_RC_NOT_DONE;
885
886     if(mailbox_list->offset == mailbox_list->size)
887     {
888         dynamic_smsg_mailbox_t *new_mailbox_entry;
889         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
890         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
891         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
892         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
893         new_mailbox_entry->offset = 0;
894         
895         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
896             new_mailbox_entry->size, smsg_rx_cqh,
897             GNI_MEM_READWRITE,   
898             -1,
899             &(new_mailbox_entry->mem_hndl));
900
901         GNI_RC_CHECK("register", status);
902         new_mailbox_entry->next = mailbox_list;
903         mailbox_list = new_mailbox_entry;
904     }
905     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
906     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
907     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
908     local_smsg_attr->mbox_offset = mailbox_list->offset;
909     mailbox_list->offset += smsg_memlen;
910     local_smsg_attr->buff_size = smsg_memlen;
911     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
912     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
913 }
914
915 /* useDynamicSMSG */
916 inline 
917 static int connect_to(int destNode)
918 {
919     gni_return_t status = GNI_RC_NOT_DONE;
920     CmiAssert(smsg_connected_flag[destNode] == 0);
921     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
922     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
923     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
924     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
925     
926     CMI_GNI_LOCK
927     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
928     CMI_GNI_UNLOCK
929     if (status == GNI_RC_ERROR_RESOURCE) {
930       /* possibly destNode is making connection at the same time */
931       free(smsg_attr_vector_local[destNode]);
932       smsg_attr_vector_local[destNode] = NULL;
933       free(smsg_attr_vector_remote[destNode]);
934       smsg_attr_vector_remote[destNode] = NULL;
935       mailbox_list->offset -= smsg_memlen;
936       return 0;
937     }
938     GNI_RC_CHECK("GNI_Post", status);
939     smsg_connected_flag[destNode] = 1;
940     return 1;
941 }
942
943 inline 
944 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
945 {
946     unsigned int          remote_address;
947     uint32_t              remote_id;
948     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
949     gni_smsg_attr_t       *smsg_attr;
950     gni_post_descriptor_t *pd;
951     gni_post_state_t      post_state;
952     char                  *real_data; 
953
954     if (useDynamicSMSG) {
955         switch (smsg_connected_flag[destNode]) {
956         case 0: 
957             connect_to(destNode);         /* continue to case 1 */
958         case 1:                           /* pending connection, do nothing */
959             status = GNI_RC_NOT_DONE;
960             if(inbuff ==0)
961                 buffer_small_msgs(queue, msg, size, destNode, tag);
962             return status;
963         }
964     }
965 #if CMK_SMP
966     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
967     {
968 #else
969     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
970     {
971 #endif
972         CMI_GNI_LOCK
973         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
974         CMI_GNI_UNLOCK
975         if(status == GNI_RC_SUCCESS)
976         {
977 #if CMK_SMP_TRACE_COMMTHREAD
978             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
979             { 
980                 START_EVENT();
981                 if ( tag == SMALL_DATA_TAG)
982                     real_data = (char*)msg; 
983                 else 
984                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
985                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
986             }
987 #endif
988             smsg_send_count ++;
989             return status;
990         }else
991             status = GNI_RC_ERROR_RESOURCE;
992     }
993     if(inbuff ==0)
994         buffer_small_msgs(queue, msg, size, destNode, tag);
995     return status;
996 }
997
998
999 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1000 {
1001     /* construct a control message and send */
1002     CONTROL_MSG         *control_msg_tmp;
1003     MallocControlMsg(control_msg_tmp);
1004     control_msg_tmp->source_addr = (uint64_t)msg;
1005     control_msg_tmp->seq_id    = seqno;
1006     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1007 #if     USE_LRTS_MEMPOOL
1008     if(size < BIG_MSG)
1009     {
1010         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1011     }
1012     else
1013     {
1014         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1015         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1016         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1017     }
1018 #else
1019     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1020 #endif
1021     return control_msg_tmp;
1022 }
1023
1024 #define BLOCKING_SEND_CONTROL    0
1025
1026 // Large message, send control to receiver, receiver register memory and do a GET, 
1027 // return 1 - send no success
1028 inline
1029 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1030 {
1031     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1032     uint32_t            vmdh_index  = -1;
1033     int                 size;
1034     int                 offset = 0;
1035     uint64_t            source_addr;
1036     int                 register_size; 
1037     void                *msg;
1038
1039     size    =   control_msg_tmp->total_length;
1040     source_addr = control_msg_tmp->source_addr;
1041     register_size = control_msg_tmp->length;
1042
1043 #if  USE_LRTS_MEMPOOL
1044     if( control_msg_tmp->seq_id == 0 ){
1045 #if BLOCKING_SEND_CONTROL
1046         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1047             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1048                 LrtsAdvanceCommunication(0);
1049         }
1050 #endif
1051         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1052         {
1053             msg = (void*)source_addr;
1054             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1055             {
1056                 if(!inbuff)
1057                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1058                 return GNI_RC_ERROR_NOMEM;
1059             }
1060             //register the corresponding mempool
1061             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1062             if(status == GNI_RC_SUCCESS)
1063             {
1064                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1065             }
1066         }else
1067         {
1068             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1069             status = GNI_RC_SUCCESS;
1070         }
1071         if(NoMsgInSend( control_msg_tmp->source_addr))
1072             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1073         else
1074             register_size = 0;
1075     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1076     {
1077         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1078         source_addr += offset;
1079         size = control_msg_tmp->length;
1080 #if BLOCKING_SEND_CONTROL
1081         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1082             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1083                 LrtsAdvanceCommunication(0);
1084         }
1085 #endif
1086         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1087             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1088             {
1089                 if(!inbuff)
1090                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1091                 return GNI_RC_ERROR_NOMEM;
1092             }
1093             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1094             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1095         }
1096         else
1097         {
1098             status = GNI_RC_SUCCESS;
1099         }
1100         register_size = 0;  
1101     }
1102
1103     if(status == GNI_RC_SUCCESS)
1104     {
1105         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1106         if(status == GNI_RC_SUCCESS)
1107         {
1108             buffered_send_msg += register_size;
1109             if(control_msg_tmp->seq_id == 0)
1110             {
1111                 IncreaseMsgInSend(source_addr);
1112             }
1113             FreeControlMsg(control_msg_tmp);
1114             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1115         }else
1116             status = GNI_RC_ERROR_RESOURCE;
1117
1118     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1119     {
1120         CmiAbort("Memory registor for large msg\n");
1121     }else 
1122     {
1123         status = GNI_RC_ERROR_NOMEM; 
1124         if(!inbuff)
1125             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1126     }
1127     return status;
1128 #else
1129     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1130     if(status == GNI_RC_SUCCESS)
1131     {
1132         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1133         if(status == GNI_RC_SUCCESS)
1134         {
1135             FreeControlMsg(control_msg_tmp);
1136         }
1137     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1138     {
1139         CmiAbort("Memory registor for large msg\n");
1140     }else 
1141     {
1142         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1143     }
1144     return status;
1145 #endif
1146 }
1147
1148 inline void LrtsPrepareEnvelope(char *msg, int size)
1149 {
1150     CmiSetMsgSize(msg, size);
1151 }
1152
1153 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1154 {
1155     gni_return_t        status  =   GNI_RC_SUCCESS;
1156     uint8_t tag;
1157     CONTROL_MSG         *control_msg_tmp;
1158     int                 oob = ( mode & OUT_OF_BAND);
1159     SMSG_QUEUE          *queue;
1160
1161     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1162 #if CMK_USE_OOB
1163     queue = oob? &smsg_oob_queue : &smsg_queue;
1164 #else
1165     queue = &smsg_queue;
1166 #endif
1167
1168     LrtsPrepareEnvelope(msg, size);
1169
1170 #if PRINT_SYH
1171     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1172 #endif 
1173 #if CMK_SMP && COMM_THREAD_SEND
1174     if(size <= SMSG_MAX_MSG)
1175         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1176     else if (size < BIG_MSG) {
1177         control_msg_tmp =  construct_control_msg(size, msg, 0);
1178         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1179     }
1180     else {
1181           CmiSetMsgSeq(msg, 0);
1182           control_msg_tmp =  construct_control_msg(size, msg, 1);
1183           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1184     }
1185 #else   //non-smp, smp(worker sending)
1186     if(size <= SMSG_MAX_MSG)
1187     {
1188         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1189             CmiFree(msg);
1190     }
1191     else if (size < BIG_MSG) {
1192         control_msg_tmp =  construct_control_msg(size, msg, 0);
1193         send_large_messages(queue, destNode, control_msg_tmp, 0);
1194     }
1195     else {
1196 #if     USE_LRTS_MEMPOOL
1197         CmiSetMsgSeq(msg, 0);
1198         control_msg_tmp =  construct_control_msg(size, msg, 1);
1199         send_large_messages(queue, destNode, control_msg_tmp, 0);
1200 #else
1201         control_msg_tmp =  construct_control_msg(size, msg, 0);
1202         send_large_messages(queue, destNode, control_msg_tmp, 0);
1203 #endif
1204     }
1205 #endif
1206     return 0;
1207 }
1208
1209 static void    PumpDatagramConnection();
1210 static void registerUserTraceEvents() {
1211 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1212     traceRegisterUserEvent("setting up connections", 10);
1213     traceRegisterUserEvent("Receiving small msgs", 20);
1214     traceRegisterUserEvent("Release local transaction", 30);
1215     traceRegisterUserEvent("Sending buffered small msgs", 40);
1216     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1217 #endif
1218 }
1219
1220 static void ProcessDeadlock()
1221 {
1222     static CmiUInt8 *ptr = NULL;
1223     static CmiUInt8  last = 0, mysum, sum;
1224     static int count = 0;
1225     gni_return_t status;
1226     int i;
1227
1228 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1229 //sweep_mempool(CpvAccess(mempool));
1230     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1231     mysum = smsg_send_count + smsg_recv_count;
1232     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1233     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1234     GNI_RC_CHECK("PMI_Allgather", status);
1235     sum = 0;
1236     for (i=0; i<mysize; i++)  sum+= ptr[i];
1237     if (last == 0 || sum == last) 
1238         count++;
1239     else
1240         count = 0;
1241     last = sum;
1242     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1243     if (count == 2) { 
1244         /* detected twice, it is a real deadlock */
1245         if (myrank == 0)  {
1246             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %d and %d).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1247             CmiAbort("Fatal> Deadlock detected.");
1248         }
1249     }
1250     _detected_hang = 0;
1251 }
1252
1253 static void CheckProgress()
1254 {
1255     if (smsg_send_count == last_smsg_send_count &&
1256         smsg_recv_count == last_smsg_recv_count ) 
1257     {
1258         _detected_hang = 1;
1259 #if !CMK_SMP
1260         if (_detected_hang) ProcessDeadlock();
1261 #endif
1262
1263     }
1264     else {
1265         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1266         last_smsg_send_count = smsg_send_count;
1267         last_smsg_recv_count = smsg_recv_count;
1268         _detected_hang = 0;
1269     }
1270 }
1271
1272 static void set_limit()
1273 {
1274     if (!user_set_flag && CmiMyRank() == 0) {
1275         int mynode = CmiPhysicalNodeID(CmiMyPe());
1276         int numpes = CmiNumPesOnPhysicalNode(mynode);
1277         int numprocesses = numpes / CmiMyNodeSize();
1278         MAX_REG_MEM  = _totalmem / numprocesses;
1279         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1280         if (CmiMyPe() == 0)
1281            printf("mem_max = %d, send_max =%d\n", MAX_REG_MEM, MAX_BUFF_SEND);
1282     }
1283 }
1284
1285 void LrtsPostCommonInit(int everReturn)
1286 {
1287 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1288     CpvInitialize(double, projTraceStart);
1289     /* only PE 0 needs to care about registration (to generate sts file). */
1290     if (CmiMyPe() == 0) {
1291         registerMachineUserEventsFunction(&registerUserTraceEvents);
1292     }
1293 #endif
1294
1295 #if CMK_SMP
1296     CmiIdleState *s=CmiNotifyGetState();
1297     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1298     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1299 #else
1300     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1301     if (useDynamicSMSG)
1302     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1303 #endif
1304
1305     if (_checkProgress)
1306 #if CMK_SMP
1307     if (CmiMyRank() == 0)
1308 #endif
1309     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1310
1311     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1312 }
1313
1314 /* this is called by worker thread */
1315 void LrtsPostNonLocal(){
1316 #if CMK_SMP
1317 #if MULTI_THREAD_SEND
1318     if(mysize == 1) return;
1319     PumpLocalRdmaTransactions();
1320 #if CMK_USE_OOB
1321     if (SendBufferMsg(&smsg_oob_queue) == 1)
1322 #endif
1323     SendBufferMsg(&smsg_queue);
1324     SendRdmaMsg();
1325 #endif
1326 #endif
1327 }
1328
1329 /* useDynamicSMSG */
1330 static void    PumpDatagramConnection()
1331 {
1332     uint32_t          remote_address;
1333     uint32_t          remote_id;
1334     gni_return_t status;
1335     gni_post_state_t  post_state;
1336     uint64_t          datagram_id;
1337     int i;
1338
1339    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1340    {
1341        if (datagram_id >= mysize) {           /* bound endpoint */
1342            int pe = datagram_id - mysize;
1343            CMI_GNI_LOCK
1344            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1345            CMI_GNI_UNLOCK
1346            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1347            {
1348                CmiAssert(remote_id == pe);
1349                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1350                GNI_RC_CHECK("Dynamic SMSG Init", status);
1351 #if PRINT_SYH
1352                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1353 #endif
1354                CmiAssert(smsg_connected_flag[pe] == 1);
1355                smsg_connected_flag[pe] = 2;
1356            }
1357        }
1358        else {         /* unbound ep */
1359            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1360            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1361            {
1362                CmiAssert(remote_id<mysize);
1363                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1364                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1365                GNI_RC_CHECK("Dynamic SMSG Init", status);
1366 #if PRINT_SYH
1367                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1368 #endif
1369                smsg_connected_flag[remote_id] = 2;
1370
1371                alloc_smsg_attr(&send_smsg_attr);
1372                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1373                GNI_RC_CHECK("post unbound datagram", status);
1374            }
1375        }
1376    }
1377 }
1378
1379 /* pooling CQ to receive network message */
1380 static void PumpNetworkRdmaMsgs()
1381 {
1382     gni_cq_entry_t      event_data;
1383     gni_return_t        status;
1384
1385 }
1386
1387 inline 
1388 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1389 {
1390     RDMA_REQUEST        *rdma_request_msg;
1391     MallocRdmaRequest(rdma_request_msg);
1392     rdma_request_msg->destNode = inst_id;
1393     rdma_request_msg->pd = pd;
1394 #if CMK_SMP
1395     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1396 #else
1397     if(sendRdmaBuf == 0)
1398     {
1399         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1400     }else{
1401         sendRdmaTail->next = rdma_request_msg;
1402         sendRdmaTail =  rdma_request_msg;
1403     }
1404 #endif
1405
1406 }
1407 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1408 static void PumpNetworkSmsg()
1409 {
1410     uint64_t            inst_id;
1411     int                 ret;
1412     gni_cq_entry_t      event_data;
1413     gni_return_t        status, status2;
1414     void                *header;
1415     uint8_t             msg_tag;
1416     int                 msg_nbytes;
1417     void                *msg_data;
1418     gni_mem_handle_t    msg_mem_hndl;
1419     gni_smsg_attr_t     *smsg_attr;
1420     gni_smsg_attr_t     *remote_smsg_attr;
1421     int                 init_flag;
1422     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1423     uint64_t            source_addr;
1424     SMSG_QUEUE         *queue = &smsg_queue;
1425
1426     while(1)
1427     {
1428         CMI_GNI_LOCK
1429         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1430         CMI_GNI_UNLOCK
1431         if(status != GNI_RC_SUCCESS)
1432             break;
1433         inst_id = GNI_CQ_GET_INST_ID(event_data);
1434         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1435 #if PRINT_SYH
1436         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1437 #endif
1438         if (useDynamicSMSG) {
1439             /* subtle: smsg may come before connection is setup */
1440             while (smsg_connected_flag[inst_id] != 2) 
1441                PumpDatagramConnection();
1442         }
1443         msg_tag = GNI_SMSG_ANY_TAG;
1444         while(1) {
1445             CMI_GNI_LOCK
1446             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1447             CMI_GNI_UNLOCK
1448             if (status != GNI_RC_SUCCESS)
1449                 break;
1450 #if PRINT_SYH
1451             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1452 #endif
1453             /* copy msg out and then put into queue (small message) */
1454             switch (msg_tag) {
1455             case SMALL_DATA_TAG:
1456             {
1457                 START_EVENT();
1458                 msg_nbytes = CmiGetMsgSize(header);
1459                 msg_data    = CmiAlloc(msg_nbytes);
1460                 memcpy(msg_data, (char*)header, msg_nbytes);
1461 #if CMK_SMP_TRACE_COMMTHREAD
1462                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1463 #endif
1464                 handleOneRecvedMsg(msg_nbytes, msg_data);
1465                 break;
1466             }
1467             case LMSG_INIT_TAG:
1468             {
1469                 getLargeMsgRequest(header, inst_id);
1470                 break;
1471             }
1472             case ACK_TAG:   //msg fit into mempool
1473             {
1474                 /* Get is done, release message . Now put is not used yet*/
1475                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1476 #if ! USE_LRTS_MEMPOOL
1477                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1478 #else
1479                 DecreaseMsgInSend(msg);
1480 #endif
1481                 if(NoMsgInSend(msg))
1482                     buffered_send_msg -= GetMempoolsize(msg);
1483                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1484                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1485                 break;
1486             }
1487             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1488             {
1489                 header_tmp = (CONTROL_MSG *) header;
1490                 void *msg = (void*)(header_tmp->source_addr);
1491                 int cur_seq = CmiGetMsgSeq(msg);
1492                 int offset = ONE_SEG*(cur_seq+1);
1493                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1494                 buffered_send_msg -= header_tmp->length;
1495                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1496                 if (remain_size < 0) remain_size = 0;
1497                 CmiSetMsgSize(msg, remain_size);
1498                 if(remain_size <= 0) //transaction done
1499                 {
1500                     CmiFree(msg);
1501                 }else if (header_tmp->total_length > offset)
1502                 {
1503                     CmiSetMsgSeq(msg, cur_seq+1);
1504                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1505                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1506                     //send next seg
1507                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1508                          // pipelining
1509                     if (header_tmp->seq_id == 1) {
1510                       int i;
1511                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1512                         int seq = cur_seq+i+2;
1513                         CmiSetMsgSeq(msg, seq-1);
1514                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1515                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1516                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1517                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1518                       }
1519                     }
1520                 }
1521                 break;
1522             }
1523 #if CMK_PERSISTENT_COMM
1524             case PUT_DONE_TAG: //persistent message
1525                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1526                 int size = ((CONTROL_MSG *) header)->length;
1527                 CmiReference(msg);
1528                 handleOneRecvedMsg(size, msg); 
1529                 break;
1530 #endif
1531 #ifdef CMK_DIRECT
1532             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1533                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1534                 break;
1535 #endif
1536             default: {
1537                 printf("weird tag problem\n");
1538                 CmiAbort("Unknown tag\n");
1539                      }
1540             }               // end switch
1541 #if PRINT_SYH
1542             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1543 #endif
1544             CMI_GNI_LOCK
1545             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1546             CMI_GNI_UNLOCK
1547             smsg_recv_count ++;
1548             msg_tag = GNI_SMSG_ANY_TAG;
1549         } //endwhile getNext
1550     }   //end while GetEvent
1551     if(status == GNI_RC_ERROR_RESOURCE)
1552     {
1553         printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");  
1554         GNI_RC_CHECK("Smsg_rx_cq full", status);
1555     }
1556 }
1557
1558 static void printDesc(gni_post_descriptor_t *pd)
1559 {
1560     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1561 }
1562
1563 // for BIG_MSG called on receiver side for receiving control message
1564 // LMSG_INIT_TAG
1565 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1566 {
1567 #if     USE_LRTS_MEMPOOL
1568     CONTROL_MSG         *request_msg;
1569     gni_return_t        status = GNI_RC_SUCCESS;
1570     void                *msg_data;
1571     gni_post_descriptor_t *pd;
1572     gni_mem_handle_t    msg_mem_hndl;
1573     int source, size, transaction_size, offset = 0;
1574     int     register_size = 0;
1575
1576     // initial a get to transfer data from the sender side */
1577     request_msg = (CONTROL_MSG *) header;
1578     size = request_msg->total_length;
1579     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1580     if(request_msg->seq_id < 2)   {
1581         msg_data = CmiAlloc(size);
1582         CmiSetMsgSeq(msg_data, 0);
1583         _MEMCHECK(msg_data);
1584     }
1585     else {
1586         offset = ONE_SEG*(request_msg->seq_id-1);
1587         msg_data = (char*)request_msg->dest_addr + offset;
1588     }
1589    
1590     MallocPostDesc(pd);
1591     pd->cqwrite_value = request_msg->seq_id;
1592     if( request_msg->seq_id == 0)
1593     {
1594         pd->local_mem_hndl= GetMemHndl(msg_data);
1595         transaction_size = ALIGN64(size);
1596         if(IsMemHndlZero(pd->local_mem_hndl))
1597         {   
1598             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1599             if(status == GNI_RC_SUCCESS)
1600             {
1601                 pd->local_mem_hndl = GetMemHndl(msg_data);
1602             }
1603             else
1604             {
1605                 SetMemHndlZero(pd->local_mem_hndl);
1606             }
1607         }
1608         if(NoMsgInRecv( (void*)(msg_data)))
1609             register_size = GetMempoolsize((void*)(msg_data));
1610         else
1611             register_size = 0;
1612     }
1613     else{
1614         transaction_size = ALIGN64(request_msg->length);
1615         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1616         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1617         {
1618             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1619         }
1620     }
1621     pd->first_operand = ALIGN64(size);                   //  total length
1622
1623     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1624         pd->type            = GNI_POST_FMA_GET;
1625     else
1626         pd->type            = GNI_POST_RDMA_GET;
1627 #if REMOTE_EVENT
1628     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1629 #else
1630     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1631 #endif
1632     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1633     pd->length          = transaction_size;
1634     pd->local_addr      = (uint64_t) msg_data;
1635     pd->remote_addr     = request_msg->source_addr + offset;
1636     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1637     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1638     pd->rdma_mode       = 0;
1639     pd->amo_cmd         = 0;
1640
1641     //memory registration success
1642     if(status == GNI_RC_SUCCESS)
1643     {
1644         CMI_GNI_LOCK
1645         if(pd->type == GNI_POST_RDMA_GET) 
1646             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1647         else
1648             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1649         CMI_GNI_UNLOCK
1650
1651         if(status == GNI_RC_SUCCESS )
1652         {
1653             if(pd->cqwrite_value == 0)
1654             {
1655 #if MACHINE_DEBUG_LOG
1656                 buffered_recv_msg += register_size;
1657                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1658 #endif
1659                 IncreaseMsgInRecv(msg_data);
1660             }
1661         }
1662     }else
1663     {
1664         SetMemHndlZero((pd->local_mem_hndl));
1665     }
1666     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1667     {
1668         bufferRdmaMsg(inst_id, pd); 
1669     }else {
1670          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1671         GNI_RC_CHECK("GetLargeAFter posting", status);
1672     }
1673 #else
1674     CONTROL_MSG         *request_msg;
1675     gni_return_t        status;
1676     void                *msg_data;
1677     gni_post_descriptor_t *pd;
1678     RDMA_REQUEST        *rdma_request_msg;
1679     gni_mem_handle_t    msg_mem_hndl;
1680     //int source;
1681     // initial a get to transfer data from the sender side */
1682     request_msg = (CONTROL_MSG *) header;
1683     msg_data = CmiAlloc(request_msg->length);
1684     _MEMCHECK(msg_data);
1685
1686     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1687
1688     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1689     {
1690         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1691     }
1692
1693     MallocPostDesc(pd);
1694     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1695         pd->type            = GNI_POST_FMA_GET;
1696     else
1697         pd->type            = GNI_POST_RDMA_GET;
1698 #if REMOTE_EVENT
1699     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1700 #else
1701     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1702 #endif
1703     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1704     pd->length          = ALIGN64(request_msg->length);
1705     pd->local_addr      = (uint64_t) msg_data;
1706     pd->remote_addr     = request_msg->source_addr;
1707     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1708     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1709     pd->rdma_mode       = 0;
1710     pd->amo_cmd         = 0;
1711
1712     //memory registration successful
1713     if(status == GNI_RC_SUCCESS)
1714     {
1715         pd->local_mem_hndl  = msg_mem_hndl;
1716         CMI_GNI_LOCK
1717         if(pd->type == GNI_POST_RDMA_GET) 
1718             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1719         else
1720             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1721         CMI_GNI_UNLOCK
1722     }else
1723     {
1724         SetMemHndlZero(pd->local_mem_hndl);
1725     }
1726     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1727     {
1728         MallocRdmaRequest(rdma_request_msg);
1729         rdma_request_msg->next = 0;
1730         rdma_request_msg->destNode = inst_id;
1731         rdma_request_msg->pd = pd;
1732         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1733     }else {
1734         GNI_RC_CHECK("AFter posting", status);
1735     }
1736 #endif
1737 }
1738
1739 static void PumpLocalRdmaTransactions()
1740 {
1741     gni_cq_entry_t          ev;
1742     gni_return_t            status;
1743     uint64_t                type, inst_id;
1744     gni_post_descriptor_t   *tmp_pd;
1745     MSG_LIST                *ptr;
1746     CONTROL_MSG             *ack_msg_tmp;
1747     uint8_t                 msg_tag;
1748 #ifdef CMK_DIRECT
1749     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1750 #endif
1751     SMSG_QUEUE         *queue = &smsg_queue;
1752
1753     while(1) {
1754         CMI_GNI_LOCK 
1755         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1756         CMI_GNI_UNLOCK
1757         if(status != GNI_RC_SUCCESS) break;
1758         
1759         type = GNI_CQ_GET_TYPE(ev);
1760         if (type == GNI_CQ_EVENT_TYPE_POST)
1761         {
1762             inst_id     = GNI_CQ_GET_INST_ID(ev);
1763 #if PRINT_SYH
1764             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1765 #endif
1766             CMI_GNI_LOCK
1767             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1768             CMI_GNI_UNLOCK
1769 #ifdef CMK_DIRECT
1770             if(tmp_pd->amo_cmd == 1)
1771             {
1772                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1773                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1774                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1775                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1776             }
1777             else{
1778                 MallocControlMsg(ack_msg_tmp);
1779                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1780                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1781             }
1782 #else
1783             MallocControlMsg(ack_msg_tmp);
1784             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1785             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1786 #endif
1787             ////Message is sent, free message , put is not used now
1788             switch (tmp_pd->type) {
1789 #if CMK_PERSISTENT_COMM
1790             case GNI_POST_RDMA_PUT:
1791 #if ! USE_LRTS_MEMPOOL
1792                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1793 #endif
1794             case GNI_POST_FMA_PUT:
1795                 CmiFree((void *)tmp_pd->local_addr);
1796                 msg_tag = PUT_DONE_TAG;
1797                 break;
1798 #endif
1799 #ifdef CMK_DIRECT
1800             case GNI_POST_RDMA_PUT:
1801             case GNI_POST_FMA_PUT:
1802                 //sender ACK to receiver to trigger it is done
1803                 msg_tag = DIRECT_PUT_DONE_TAG;
1804                 break;
1805 #endif
1806             case GNI_POST_RDMA_GET:
1807             case GNI_POST_FMA_GET:
1808 #if  ! USE_LRTS_MEMPOOL
1809                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1810                 msg_tag = ACK_TAG;  
1811 #else
1812                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1813                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1814                 {
1815                     msg_tag = BIG_MSG_TAG; 
1816                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1817                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1818                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1819                 } 
1820                 else
1821                 {
1822                     msg_tag = ACK_TAG;  
1823                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1824                 }
1825                 ack_msg_tmp->length = tmp_pd->length;
1826                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1827 #endif
1828                 break;
1829             default:
1830                 CmiPrintf("type=%d\n", tmp_pd->type);
1831                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1832             }
1833 #if CMK_DIRECT
1834             if(tmp_pd->amo_cmd == 1)
1835                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1836             else
1837 #endif
1838                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1839             if(status == GNI_RC_SUCCESS)
1840             {
1841 #if CMK_DIRECT
1842                 if(tmp_pd->amo_cmd == 1)
1843                     free(cmk_direct_done_msg); 
1844                 else
1845 #endif
1846                     FreeControlMsg(ack_msg_tmp);
1847
1848             }
1849 #if CMK_PERSISTENT_COMM
1850             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1851 #endif
1852             {
1853                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1854 #if PRINT_SYH
1855                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1856 #endif
1857 #if CMK_SMP_TRACE_COMMTHREAD
1858                     START_EVENT();
1859 #endif
1860                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1861                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1862 #if MACHINE_DEBUG_LOG
1863                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1864                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1865                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1866 #endif
1867 #if CMK_SMP_TRACE_COMMTHREAD
1868                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1869 #endif
1870                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1871                 }else if(msg_tag == BIG_MSG_TAG){
1872                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1873                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1874                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1875 #if CMK_SMP_TRACE_COMMTHREAD
1876                         START_EVENT();
1877 #endif
1878 #if PRINT_SYH
1879                         printf("Pipeline msg done [%d]\n", myrank);
1880 #endif
1881 #if CMK_SMP_TRACE_COMMTHREAD
1882                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1883 #endif
1884                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1885                     }
1886                 }
1887             }
1888             FreePostDesc(tmp_pd);
1889         }
1890     } //end while
1891     if(status == GNI_RC_ERROR_RESOURCE)
1892     {
1893         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
1894         GNI_RC_CHECK("Smsg_tx_cq full", status);
1895     }
1896 }
1897
1898 static void  SendRdmaMsg()
1899 {
1900     gni_return_t            status = GNI_RC_SUCCESS;
1901     gni_mem_handle_t        msg_mem_hndl;
1902     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
1903     RDMA_REQUEST            *pre = 0;
1904     int i, register_size = 0;
1905     void                    *msg;
1906 #if CMK_SMP
1907     int len = PCQueueLength(sendRdmaBuf);
1908     for (i=0; i<len; i++)
1909     {
1910         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1911         if (ptr == NULL) break;
1912 #else
1913     ptr = sendRdmaBuf;
1914     while (ptr!=0)
1915     {
1916 #endif 
1917         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1918         gni_post_descriptor_t *pd = ptr->pd;
1919         status = GNI_RC_SUCCESS;
1920         
1921         if(pd->cqwrite_value == 0)
1922         {
1923             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1924             {
1925                 msg = (void*)(pd->local_addr);
1926                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1927                 if(status == GNI_RC_SUCCESS)
1928                 {
1929                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1930                 }
1931             }else
1932             {
1933                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1934             }
1935             if(NoMsgInRecv( (void*)(pd->local_addr)))
1936                 register_size = GetMempoolsize((void*)(pd->local_addr));
1937             else
1938                 register_size = 0;
1939         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1940         {
1941             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1942         }
1943         if(status == GNI_RC_SUCCESS)        //mem register good
1944         {
1945             CMI_GNI_LOCK
1946             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1947                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1948             else
1949                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1950             CMI_GNI_UNLOCK
1951             if(status == GNI_RC_SUCCESS)    //post good
1952             {
1953 #if !CMK_SMP
1954                 tmp_ptr = ptr;
1955                 if(pre != 0) {
1956                     pre->next = ptr->next;
1957                 }
1958                 else {
1959                     sendRdmaBuf = ptr->next;
1960                 }
1961                 ptr = ptr->next;
1962                 FreeRdmaRequest(tmp_ptr);
1963 #endif
1964                 if(pd->cqwrite_value == 0)
1965                 {
1966                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1967                 }
1968 #if MACHINE_DEBUG_LOG
1969                 buffered_recv_msg += register_size;
1970                 MACHSTATE(8, "GO request from buffered\n"); 
1971 #endif
1972             }else           // cannot post
1973             {
1974 #if CMK_SMP
1975                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1976 #else
1977                 pre = ptr;
1978                 ptr = ptr->next;
1979 #endif
1980                 break;
1981             }
1982         } else          //memory registration fails
1983         {
1984 #if CMK_SMP
1985             PCQueuePush(sendRdmaBuf, (char*)ptr);
1986 #else
1987             pre = ptr;
1988             ptr = ptr->next;
1989 #endif
1990         }
1991     } //end while
1992 #if ! CMK_SMP
1993     if(ptr == 0)
1994         sendRdmaTail = pre;
1995 #endif
1996 }
1997
1998 // return 1 if all messages are sent
1999 static int SendBufferMsg(SMSG_QUEUE *queue)
2000 {
2001     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
2002     CONTROL_MSG         *control_msg_tmp;
2003     gni_return_t        status;
2004     int                 done = 1;
2005     int                 register_size;
2006     void                *register_addr;
2007     int                 index_previous = -1;
2008     int                 index = queue->smsg_head_index;
2009 #if CMI_EXERT_SEND_CAP
2010     int                 sent_cnt = 0;
2011 #endif
2012
2013 #if !CMK_SMP
2014     index = queue->smsg_head_index;
2015 #else
2016     index = 0;
2017 #endif
2018 #if CMK_SMP
2019     while(index <mysize)
2020     {
2021         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
2022         for (i=0; i<len; i++) 
2023         {
2024             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
2025             if (ptr == 0) break;
2026 #else
2027     while(index != -1)
2028     {
2029         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2030         pre = 0;
2031         while(ptr != 0)
2032         {
2033 #endif
2034             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2035             status = GNI_RC_ERROR_RESOURCE;
2036             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2037                 /* connection not exists yet */
2038             }
2039             else
2040             switch(ptr->tag)
2041             {
2042             case SMALL_DATA_TAG:
2043                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2044                 if(status == GNI_RC_SUCCESS)
2045                 {
2046                     CmiFree(ptr->msg);
2047                 }
2048                 break;
2049             case LMSG_INIT_TAG:
2050                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2051                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2052                 break;
2053             case   ACK_TAG:
2054             case   BIG_MSG_TAG:
2055                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2056                 if(status == GNI_RC_SUCCESS)
2057                 {
2058                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2059                 }
2060                 break;
2061 #ifdef CMK_DIRECT
2062             case DIRECT_PUT_DONE_TAG:
2063                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2064                 if(status == GNI_RC_SUCCESS)
2065                 {
2066                     free((CMK_DIRECT_HEADER*)ptr->msg);
2067                 }
2068                 break;
2069
2070 #endif
2071             default:
2072                 printf("Weird tag\n");
2073                 CmiAbort("should not happen\n");
2074             }       // end switch
2075             if(status == GNI_RC_SUCCESS)
2076             {
2077 #if !CMK_SMP
2078                 tmp_ptr = ptr;
2079                 if(pre)
2080                 {
2081                     ptr = pre ->next = ptr->next;
2082                 }else
2083                 {
2084                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2085                 }
2086                 FreeMsgList(tmp_ptr);
2087 #else
2088                 FreeMsgList(ptr);
2089 #endif
2090 #if PRINT_SYH
2091                 buffered_smsg_counter--;
2092                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2093 #endif
2094 #if CMI_EXERT_SEND_CAP
2095                 sent_cnt++;
2096                 if(sent_cnt == SEND_CAP)
2097                     break;
2098 #endif
2099             }else {
2100 #if CMK_SMP
2101                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2102 #else
2103                 pre = ptr;
2104                 ptr=ptr->next;
2105 #endif
2106                 done = 0;
2107                 if(status == GNI_RC_ERROR_RESOURCE)
2108                     break;
2109             } 
2110         } //end while
2111 #if !CMK_SMP
2112         if(ptr == 0)
2113             queue->smsg_msglist_index[index].tail = pre;
2114         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2115         {
2116             if(index_previous != -1)
2117                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2118             else
2119                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2120         }else
2121         {index_previous = index;
2122         }
2123         index = queue->smsg_msglist_index[index].next;
2124 #else
2125         index++;
2126 #endif
2127
2128 #if CMI_EXERT_SEND_CAP
2129         if(sent_cnt == SEND_CAP)
2130                 break;
2131 #endif
2132     }   // end pooling for all cores
2133     return done;
2134 }
2135
2136 static void ProcessDeadlock();
2137 void LrtsAdvanceCommunication(int whileidle)
2138 {
2139     /*  Receive Msg first */
2140 #if CMK_SMP_TRACE_COMMTHREAD
2141     double startT, endT;
2142 #endif
2143     if (useDynamicSMSG && whileidle)
2144     {
2145 #if CMK_SMP_TRACE_COMMTHREAD
2146         startT = CmiWallTimer();
2147 #endif
2148         PumpDatagramConnection();
2149 #if CMK_SMP_TRACE_COMMTHREAD
2150         endT = CmiWallTimer();
2151         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2152 #endif
2153     }
2154
2155 #if CMK_SMP_TRACE_COMMTHREAD
2156     startT = CmiWallTimer();
2157 #endif
2158     PumpNetworkSmsg();
2159     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2160 #if CMK_SMP_TRACE_COMMTHREAD
2161     endT = CmiWallTimer();
2162     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2163 #endif
2164
2165 #if CMK_SMP_TRACE_COMMTHREAD
2166     startT = CmiWallTimer();
2167 #endif
2168     PumpLocalRdmaTransactions();
2169     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2170 #if CMK_SMP_TRACE_COMMTHREAD
2171     endT = CmiWallTimer();
2172     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2173 #endif
2174     /* Send buffered Message */
2175 #if CMK_SMP_TRACE_COMMTHREAD
2176     startT = CmiWallTimer();
2177 #endif
2178 #if CMK_USE_OOB
2179     if (SendBufferMsg(&smsg_oob_queue) == 1)
2180 #endif
2181     SendBufferMsg(&smsg_queue);
2182     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2183 #if CMK_SMP_TRACE_COMMTHREAD
2184     endT = CmiWallTimer();
2185     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2186 #endif
2187
2188 #if CMK_SMP_TRACE_COMMTHREAD
2189     startT = CmiWallTimer();
2190 #endif
2191     SendRdmaMsg();
2192     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2193 #if CMK_SMP_TRACE_COMMTHREAD
2194     endT = CmiWallTimer();
2195     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2196 #endif
2197
2198 #if CMK_SMP
2199     if (_detected_hang)  ProcessDeadlock();
2200 #endif
2201 }
2202
2203 /* useDynamicSMSG */
2204 static void _init_dynamic_smsg()
2205 {
2206     gni_return_t status;
2207     uint32_t     vmdh_index = -1;
2208     int i;
2209
2210     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2211     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2212     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2213     for(i=0; i<mysize; i++) {
2214         smsg_connected_flag[i] = 0;
2215         smsg_attr_vector_local[i] = NULL;
2216         smsg_attr_vector_remote[i] = NULL;
2217     }
2218     if(mysize <=512)
2219     {
2220         SMSG_MAX_MSG = 4096;
2221     }else if (mysize <= 4096)
2222     {
2223         SMSG_MAX_MSG = 4096/mysize * 1024;
2224     }else if (mysize <= 16384)
2225     {
2226         SMSG_MAX_MSG = 512;
2227     }else {
2228         SMSG_MAX_MSG = 256;
2229     }
2230
2231     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2232     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2233     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2234     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2235     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2236
2237     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2238     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2239     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2240     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2241     mailbox_list->offset = 0;
2242     mailbox_list->next = 0;
2243     
2244     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2245         mailbox_list->size, smsg_rx_cqh,
2246         GNI_MEM_READWRITE,   
2247         vmdh_index,
2248         &(mailbox_list->mem_hndl));
2249     GNI_RC_CHECK("MEMORY registration for smsg", status);
2250
2251     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2252     GNI_RC_CHECK("Unbound EP", status);
2253     
2254     alloc_smsg_attr(&send_smsg_attr);
2255
2256     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2257     GNI_RC_CHECK("post unbound datagram", status);
2258
2259       /* always pre-connect to proc 0 */
2260     //if (myrank != 0) connect_to(0);
2261 }
2262
2263 static void _init_static_smsg()
2264 {
2265     gni_smsg_attr_t      *smsg_attr;
2266     gni_smsg_attr_t      remote_smsg_attr;
2267     gni_smsg_attr_t      *smsg_attr_vec;
2268     gni_mem_handle_t     my_smsg_mdh_mailbox;
2269     int      ret, i;
2270     gni_return_t status;
2271     uint32_t              vmdh_index = -1;
2272     mdh_addr_t            base_infor;
2273     mdh_addr_t            *base_addr_vec;
2274     char *env;
2275
2276     if(mysize <=512)
2277     {
2278         SMSG_MAX_MSG = 1024;
2279     }else if (mysize <= 4096)
2280     {
2281         SMSG_MAX_MSG = 1024;
2282     }else if (mysize <= 16384)
2283     {
2284         SMSG_MAX_MSG = 512;
2285     }else {
2286         SMSG_MAX_MSG = 256;
2287     }
2288     
2289     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2290     if (env) SMSG_MAX_MSG = atoi(env);
2291     CmiAssert(SMSG_MAX_MSG > 0);
2292
2293     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2294     
2295     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2296     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2297     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2298     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2299     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2300     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2301     CmiAssert(ret == 0);
2302     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2303     
2304     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2305             smsg_memlen*(mysize), smsg_rx_cqh,
2306             GNI_MEM_READWRITE,   
2307             vmdh_index,
2308             &my_smsg_mdh_mailbox);
2309     register_memory_size += smsg_memlen*(mysize);
2310     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2311
2312     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2313     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2314
2315     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2316     base_infor.mdh =  my_smsg_mdh_mailbox;
2317     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2318
2319     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2320  
2321     for(i=0; i<mysize; i++)
2322     {
2323         if(i==myrank)
2324             continue;
2325         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2326         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2327         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2328         smsg_attr[i].mbox_offset = i*smsg_memlen;
2329         smsg_attr[i].buff_size = smsg_memlen;
2330         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2331         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2332     }
2333
2334     for(i=0; i<mysize; i++)
2335     {
2336         if (myrank == i) continue;
2337
2338         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2339         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2340         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2341         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2342         remote_smsg_attr.buff_size = smsg_memlen;
2343         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2344         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2345
2346         /* initialize the smsg channel */
2347         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2348         GNI_RC_CHECK("SMSG Init", status);
2349     } //end initialization
2350
2351     free(base_addr_vec);
2352     free(smsg_attr);
2353
2354     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2355     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2356
2357
2358 inline
2359 static void _init_send_queue(SMSG_QUEUE *queue)
2360 {
2361      int i;
2362      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2363      for(i =0; i<mysize; i++)
2364      {
2365         queue->smsg_msglist_index[i].next = -1;
2366 #if CMK_SMP
2367         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2368 #else
2369         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2370 #endif
2371         
2372      }
2373      queue->smsg_head_index = -1;
2374 }
2375
2376 inline
2377 static void _init_smsg()
2378 {
2379     if(mysize > 1) {
2380         if (useDynamicSMSG)
2381             _init_dynamic_smsg();
2382         else
2383             _init_static_smsg();
2384     }
2385
2386     _init_send_queue(&smsg_queue);
2387 #if CMK_USE_OOB
2388     _init_send_queue(&smsg_oob_queue);
2389 #endif
2390 }
2391
2392 static void _init_static_msgq()
2393 {
2394     gni_return_t status;
2395     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2396     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2397     msgq_attrs.smsg_q_sz = 1;
2398     msgq_attrs.rcv_pool_sz = 1;
2399     msgq_attrs.num_msgq_eps = 2;
2400     msgq_attrs.nloc_insts = 8;
2401     msgq_attrs.modes = 0;
2402     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2403
2404     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2405     GNI_RC_CHECK("MSGQ Init", status);
2406
2407
2408 }
2409
2410 #if CMK_SMP && STEAL_MEMPOOL
2411 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2412 {
2413     void *pool = NULL;
2414     int i, k;
2415     // check other ranks
2416     for (k=0; k<CmiMyNodeSize()+1; k++) {
2417         i = (CmiMyRank()+k)%CmiMyNodeSize();
2418         if (i==CmiMyRank()) continue;
2419         mempool_type *mptr = CpvAccessOther(mempool, i);
2420         CmiLock(mptr->mempoolLock);
2421         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2422         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2423             CmiUnlock(mptr->mempoolLock);
2424             continue;
2425         }
2426         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2427         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2428             /* search in the free list */
2429           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2430           mempool_header *current = free_header;
2431           while (current) {
2432             if (current->next_free == (char*)header-(char*)mptr) break;
2433             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2434           }
2435           if (current == NULL) {         /* not found in free list */
2436             CmiUnlock(mptr->mempoolLock);
2437             continue;
2438           }
2439 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2440             /* search the previous memblock, and remove the tail */
2441           mempool_block *ptr = (mempool_block *)mptr;
2442           while (ptr) {
2443             if (ptr->memblock_next == mptr->memblock_tail) break;
2444             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2445           }
2446           CmiAssert(ptr!=NULL);
2447           ptr->memblock_next = 0;
2448           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2449
2450             /* remove memblock from the free list */
2451           current->next_free = header->next_free;
2452           if (header == free_header) mptr->freelist_head = header->next_free;
2453
2454           CmiUnlock(mptr->mempoolLock);
2455
2456           pool = (void*)tail;
2457           *mem_hndl = tail->mem_hndl;
2458           *size = tail->size;
2459           return pool;
2460         }
2461         CmiUnlock(mptr->mempoolLock);
2462     }
2463
2464       /* steal failed, deregister and free memblock now */
2465     int freed = 0;
2466     for (k=0; k<CmiMyNodeSize()+1; k++) {
2467         i = (CmiMyRank()+k)%CmiMyNodeSize();
2468         mempool_type *mptr = CpvAccessOther(mempool, i);
2469         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2470
2471         mempool_block *mempools_head = &(mptr->mempools_head);
2472         mempool_block *current = mempools_head;
2473         mempool_block *prev = NULL;
2474
2475         while (current) {
2476           int isfree = 0;
2477           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2478 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2479           mempool_header *cur = free_header;
2480           mempool_header *header;
2481           if (current != mempools_head) {
2482             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2483              /* search in free list */
2484             if (header->size == current->size - sizeof(mempool_block)) {
2485               cur = free_header;
2486               while (cur) {
2487                 if (cur->next_free == (char*)header-(char*)mptr) break;
2488                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2489               }
2490               if (cur != NULL) isfree = 1;
2491             }
2492           }
2493           if (isfree) {
2494               /* remove from free list */
2495             cur->next_free = header->next_free;
2496             if (header == free_header) mptr->freelist_head = header->next_free;
2497              // deregister
2498             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2499             GNI_RC_CHECK("steal Mempool de-register", status);
2500             mempool_block *ptr = current;
2501             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2502             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2503 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2504             freed += ptr->size;
2505             free(ptr);
2506              // try now
2507             if (freed > *size) {
2508               if (pool == NULL) {
2509                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2510                 CmiAssert(ret == 0);
2511               }
2512               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2513               if (status == GNI_RC_SUCCESS) {
2514                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2515 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2516                 return pool;
2517               }
2518 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2519             }
2520           }
2521           else {
2522              prev = current;
2523              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2524           }
2525         }
2526
2527         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2528     }
2529       /* still no luck registering pool */
2530     if (pool) free(pool);
2531     return NULL;
2532 }
2533 #endif
2534
2535 static long long int total_mempool_size = 0;
2536 static long long int total_mempool_calls = 0;
2537
2538 #if USE_LRTS_MEMPOOL
2539 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2540 {
2541     void *pool;
2542     int ret;
2543
2544     int default_size =  expand_flag? _expand_mem : _mempool_size;
2545     if (*size < default_size) *size = default_size;
2546     total_mempool_size += *size;
2547     total_mempool_calls += 1;
2548     if (*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) {
2549         printf("Error: A mempool block with size %d is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX.", *size);
2550         CmiAbort("alloc_mempool_block");
2551     }
2552     ret = posix_memalign(&pool, ALIGNBUF, *size);
2553     if (ret != 0) {
2554 #if CMK_SMP && STEAL_MEMPOOL
2555       pool = steal_mempool_block(size, mem_hndl);
2556       if (pool != NULL) return pool;
2557 #endif
2558       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2559       if (ret == ENOMEM)
2560         CmiAbort("alloc_mempool_block: out of memory.");
2561       else
2562         CmiAbort("alloc_mempool_block: posix_memalign failed");
2563     }
2564     SetMemHndlZero((*mem_hndl));
2565     
2566     return pool;
2567 }
2568
2569 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2570 {
2571     if(!(IsMemHndlZero(mem_hndl)))
2572     {
2573         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2574         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2575     }
2576     free(ptr);
2577 }
2578 #endif
2579
2580 void LrtsPreCommonInit(int everReturn){
2581 #if USE_LRTS_MEMPOOL
2582     CpvInitialize(mempool_type*, mempool);
2583     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2584     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2585 #endif
2586 }
2587
2588
2589 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2590 {
2591     register int            i;
2592     int                     rc;
2593     int                     device_id = 0;
2594     unsigned int            remote_addr;
2595     gni_cdm_handle_t        cdm_hndl;
2596     gni_return_t            status = GNI_RC_SUCCESS;
2597     uint32_t                vmdh_index = -1;
2598     uint8_t                 ptag;
2599     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2600     int                     first_spawned;
2601     int                     physicalID;
2602     char                   *env;
2603
2604     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2605     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2606     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2607    
2608     status = PMI_Init(&first_spawned);
2609     GNI_RC_CHECK("PMI_Init", status);
2610
2611     status = PMI_Get_size(&mysize);
2612     GNI_RC_CHECK("PMI_Getsize", status);
2613
2614     status = PMI_Get_rank(&myrank);
2615     GNI_RC_CHECK("PMI_getrank", status);
2616
2617     //physicalID = CmiPhysicalNodeID(myrank);
2618     
2619     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2620
2621     *myNodeID = myrank;
2622     *numNodes = mysize;
2623   
2624 #if MULTI_THREAD_SEND
2625     /* Currently, we only consider the case that comm. thread will only recv msgs */
2626     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2627 #endif
2628
2629     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2630     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2631     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2632
2633     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2634     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2635     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2636
2637     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2638     if (env) useDynamicSMSG = 1;
2639     if (!useDynamicSMSG)
2640       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2641     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2642     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2643     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2644     
2645     if(myrank == 0)
2646     {
2647         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2648         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2649     }
2650 #ifdef USE_ONESIDED
2651     onesided_init(NULL, &onesided_hnd);
2652
2653     // this is a GNI test, so use the libonesided bypass functionality
2654     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2655     local_addr = gniGetNicAddress();
2656 #else
2657     ptag = get_ptag();
2658     cookie = get_cookie();
2659 #if 0
2660     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2661 #endif
2662     //Create and attach to the communication  domain */
2663     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2664     GNI_RC_CHECK("GNI_CdmCreate", status);
2665     //* device id The device id is the minor number for the device
2666     //that is assigned to the device by the system when the device is created.
2667     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2668     //where X is the device number 0 default 
2669     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2670     GNI_RC_CHECK("GNI_CdmAttach", status);
2671     local_addr = get_gni_nic_address(0);
2672 #endif
2673     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2674     _MEMCHECK(MPID_UGNI_AllAddr);
2675     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2676     /* create the local completion queue */
2677     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2678     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2679     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2680     
2681     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2682     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2683     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2684
2685     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2686     GNI_RC_CHECK("Create CQ (rx)", status);
2687     
2688     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2689     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2690     
2691     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2692     //GNI_RC_CHECK("Create BTE CQ", status);
2693
2694     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2695     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2696     _MEMCHECK(ep_hndl_array);
2697 #if CMK_SMP && !COMM_THREAD_SEND
2698     tx_cq_lock  = CmiCreateLock();
2699     rx_cq_lock  = CmiCreateLock();
2700 #endif
2701     for (i=0; i<mysize; i++) {
2702         if(i == myrank) continue;
2703         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2704         GNI_RC_CHECK("GNI_EpCreate ", status);   
2705         remote_addr = MPID_UGNI_AllAddr[i];
2706         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2707         GNI_RC_CHECK("GNI_EpBind ", status);   
2708     }
2709
2710     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2711     _init_smsg();
2712     PMI_Barrier();
2713
2714 #if     USE_LRTS_MEMPOOL
2715     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
2716     if (env) {
2717         _totalmem = CmiReadSize(env);
2718         if (myrank == 0)
2719             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
2720     }
2721
2722     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2723     if (env) _mempool_size = CmiReadSize(env);
2724     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2725         _mempool_size = CmiReadSize(env);
2726
2727
2728     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2729     if (env) {
2730         MAX_REG_MEM = CmiReadSize(env);
2731         user_set_flag = 1;
2732     }
2733     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
2734         MAX_REG_MEM = CmiReadSize(env);
2735         user_set_flag = 1;
2736     }
2737
2738     env = getenv("CHARM_UGNI_SEND_MAX");
2739     if (env) {
2740         MAX_BUFF_SEND = CmiReadSize(env);
2741         user_set_flag = 1;
2742     }
2743     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
2744         MAX_BUFF_SEND = CmiReadSize(env);
2745         user_set_flag = 1;
2746     }
2747
2748     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2749     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2750
2751     if (myrank==0) {
2752         printf("Charm++> memory pool init size: %1.fMB\n", _mempool_size/1024.0/1024);
2753         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2754         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2755             /* memblock can expand to BIG_MSG * 2 size */
2756             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2757             CmiAbort("mempool maximum size is too small. \n");
2758         }
2759 #if CMK_SMP && MULTI_THREAD_SEND
2760         printf("Charm++> worker thread sending messages\n");
2761 #elif CMK_SMP && COMM_THREAD_SEND
2762         printf("Charm++> only comm thread send/recv messages\n");
2763 #endif
2764     }
2765
2766 #endif     /* end of USE_LRTS_MEMPOOL */
2767
2768     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2769     if (env) {
2770         BIG_MSG = CmiReadSize(env);
2771         if (BIG_MSG < ONE_SEG)
2772           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2773     }
2774     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2775     if (env) {
2776         BIG_MSG_PIPELINE = atoi(env);
2777     }
2778
2779     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2780     if (env) _checkProgress = 0;
2781     if (mysize == 1) _checkProgress = 0;
2782
2783     /* init DMA buffer for medium message */
2784
2785     //_init_DMA_buffer();
2786     
2787     free(MPID_UGNI_AllAddr);
2788 #if CMK_SMP
2789     sendRdmaBuf = PCQueueCreate();
2790 #else
2791     sendRdmaBuf = 0;
2792 #endif
2793 #if MACHINE_DEBUG_LOG
2794     char ln[200];
2795     sprintf(ln,"debugLog.%d",myrank);
2796     debugLog=fopen(ln,"w");
2797 #endif
2798
2799 }
2800
2801 void* LrtsAlloc(int n_bytes, int header)
2802 {
2803     void *ptr;
2804 #if 0
2805     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2806 #endif
2807     if(n_bytes <= SMSG_MAX_MSG)
2808     {
2809         int totalsize = n_bytes+header;
2810         ptr = malloc(totalsize);
2811     }
2812     else {
2813         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2814 #if     USE_LRTS_MEMPOOL
2815         n_bytes = ALIGN64(n_bytes);
2816         if(n_bytes < BIG_MSG)
2817         {
2818             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2819             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2820         }else 
2821         {
2822             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2823             ptr = res + ALIGNBUF - header;
2824
2825         }
2826 #else
2827         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2828         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2829         ptr = res + ALIGNBUF - header;
2830 #endif
2831     }
2832     return ptr;
2833 }
2834
2835 void  LrtsFree(void *msg)
2836 {
2837     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2838     if (size <= SMSG_MAX_MSG)
2839       free(msg);
2840     else if(size>=BIG_MSG)
2841     {
2842         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2843
2844     }else
2845     {
2846 #if     USE_LRTS_MEMPOOL
2847 #if CMK_SMP
2848         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2849 #else
2850         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2851 #endif
2852 #else
2853         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2854 #endif
2855     }
2856 }
2857
2858 void LrtsExit()
2859 {
2860     /* free memory ? */
2861 #if USE_LRTS_MEMPOOL
2862     mempool_destroy(CpvAccess(mempool));
2863 #endif
2864     PMI_Finalize();
2865     exit(0);
2866 }
2867
2868 void LrtsDrainResources()
2869 {
2870     if(mysize == 1) return;
2871     while (
2872 #if CMK_USE_OOB
2873            !SendBufferMsg(&smsg_oob_queue) ||
2874 #endif
2875            !SendBufferMsg(&smsg_queue))
2876     {
2877         if (useDynamicSMSG)
2878             PumpDatagramConnection();
2879         PumpNetworkSmsg();
2880         PumpLocalRdmaTransactions();
2881         SendRdmaMsg();
2882     }
2883     PMI_Barrier();
2884 }
2885
2886 void LrtsAbort(const char *message) {
2887     printf("CmiAbort is calling on PE:%d\n", myrank);
2888     CmiPrintStackTrace(0);
2889     PMI_Abort(-1, message);
2890 }
2891
2892 /**************************  TIMER FUNCTIONS **************************/
2893 #if CMK_TIMER_USE_SPECIAL
2894 /* MPI calls are not threadsafe, even the timer on some machines */
2895 static CmiNodeLock  timerLock = 0;
2896 static int _absoluteTime = 0;
2897 static int _is_global = 0;
2898 static struct timespec start_ts;
2899
2900 inline int CmiTimerIsSynchronized() {
2901     return 0;
2902 }
2903
2904 inline int CmiTimerAbsolute() {
2905     return _absoluteTime;
2906 }
2907
2908 double CmiStartTimer() {
2909     return 0.0;
2910 }
2911
2912 double CmiInitTime() {
2913     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2914 }
2915
2916 void CmiTimerInit(char **argv) {
2917     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2918     if (_absoluteTime && CmiMyPe() == 0)
2919         printf("Charm++> absolute  timer is used\n");
2920     
2921     _is_global = CmiTimerIsSynchronized();
2922
2923
2924     if (_is_global) {
2925         if (CmiMyRank() == 0) {
2926             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2927         }
2928     } else { /* we don't have a synchronous timer, set our own start time */
2929         CmiBarrier();
2930         CmiBarrier();
2931         CmiBarrier();
2932         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2933     }
2934     CmiNodeAllBarrier();          /* for smp */
2935 }
2936
2937 /**
2938  * Since the timerLock is never created, and is
2939  * always NULL, then all the if-condition inside
2940  * the timer functions could be disabled right
2941  * now in the case of SMP.
2942  */
2943 double CmiTimer(void) {
2944     struct timespec now_ts;
2945     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2946     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2947         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2948 }
2949
2950 double CmiWallTimer(void) {
2951     struct timespec now_ts;
2952     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2953     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2954         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2955 }
2956
2957 double CmiCpuTimer(void) {
2958     struct timespec now_ts;
2959     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2960     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2961         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2962 }
2963
2964 #endif
2965 /************Barrier Related Functions****************/
2966
2967 int CmiBarrier()
2968 {
2969     gni_return_t status;
2970
2971 #if CMK_SMP
2972     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2973     CmiNodeAllBarrier();
2974     if (CmiMyRank() == CmiMyNodeSize())
2975 #else
2976     if (CmiMyRank() == 0)
2977 #endif
2978     {
2979         /**
2980          *  The call of CmiBarrier is usually before the initialization
2981          *  of trace module of Charm++, therefore, the START_EVENT
2982          *  and END_EVENT are disabled here. -Chao Mei
2983          */
2984         /*START_EVENT();*/
2985         status = PMI_Barrier();
2986         GNI_RC_CHECK("PMI_Barrier", status);
2987         /*END_EVENT(10);*/
2988     }
2989     CmiNodeAllBarrier();
2990     return status;
2991
2992 }
2993 #if CMK_DIRECT
2994 #include "machine-cmidirect.c"
2995 #endif
2996 #if CMK_PERSISTENT_COMM
2997 #include "machine-persistent.c"
2998 #endif
2999
3000