13bc442ef5c55e19b427eb0d1e5b07fddc1dbc35
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes      # disable checking deadlock
21  */
22 /*@{*/
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <errno.h>
28 #include <malloc.h>
29 #include <unistd.h>
30
31 #include <gni_pub.h>
32 #include <pmi.h>
33 #include <time.h>
34
35 #include "converse.h"
36
37 #define PRINT_SYH  0
38
39 // Trace communication thread
40 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
41 #define TRACE_THRESHOLD     0.00005
42 #define CMI_MPI_TRACE_MOREDETAILED 0
43 #undef CMI_MPI_TRACE_USEREVENTS
44 #define CMI_MPI_TRACE_USEREVENTS 1
45 #else
46 #undef CMK_SMP_TRACE_COMMTHREAD
47 #define CMK_SMP_TRACE_COMMTHREAD 0
48 #endif
49
50 #define CMK_TRACE_COMMOVERHEAD 0
51 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
52 #undef CMI_MPI_TRACE_USEREVENTS
53 #define CMI_MPI_TRACE_USEREVENTS 1
54 #else
55 #undef CMK_TRACE_COMMOVERHEAD
56 #define CMK_TRACE_COMMOVERHEAD 0
57 #endif
58
59 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
60 CpvStaticDeclare(double, projTraceStart);
61 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
62 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
63 #else
64 #define  START_EVENT()
65 #define  END_EVENT(x)
66 #endif
67
68 #define CMI_EXERT_SEND_CAP      0
69 #define CMI_EXERT_RECV_CAP      0
70
71 #if CMI_EXERT_SEND_CAP
72 #define SEND_CAP 16
73 #endif
74
75 #if CMI_EXERT_RECV_CAP
76 #define RECV_CAP 2
77 #endif
78
79 static int _checkProgress = 1;             /* check deadlock */
80 static int _detected_hang = 0;
81
82 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
83
84 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
85
86 static int avg_smsg_connection = 32;
87 static int                 *smsg_connected_flag= 0;
88 static gni_smsg_attr_t     **smsg_attr_vector_local;
89 static gni_smsg_attr_t     **smsg_attr_vector_remote;
90 static gni_ep_handle_t     ep_hndl_unbound;
91 static gni_smsg_attr_t     send_smsg_attr;
92 static gni_smsg_attr_t     recv_smsg_attr;
93
94 typedef struct _dynamic_smsg_mailbox{
95    void     *mailbox_base;
96    int      size;
97    int      offset;
98    gni_mem_handle_t  mem_hndl;
99    struct      _dynamic_smsg_mailbox  *next;
100 }dynamic_smsg_mailbox_t;
101
102 static dynamic_smsg_mailbox_t  *mailbox_list;
103
104 #define REMOTE_EVENT                      0
105 #define USE_LRTS_MEMPOOL                  1
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 static CmiInt8 _mempool_size = 8*oneMB;
111 static CmiInt8 _expand_mem =  4*oneMB;
112
113 #endif
114
115 #if CMK_SMP
116 //Dynamic flow control about memory registration
117 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
118 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
119 #else
120 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
121 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
122 #endif
123
124 static CmiInt8 buffered_send_msg = 0;
125
126 #define BIG_MSG                  16*oneMB
127 #define ONE_SEG                  4*oneMB
128 #define BIG_MSG_PIPELINE         4
129
130 #if CMK_SMP
131 #define COMM_THREAD_SEND 1
132 #endif
133
134 int         rdma_id = 0;
135
136 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
137 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
138
139 #if PRINT_SYH
140 int         lrts_send_msg_id = 0;
141 int         lrts_local_done_msg = 0;
142 int         lrts_smsg_success = 0;
143 int         lrts_send_rdma_success = 0;
144 #endif
145
146 #include "machine.h"
147
148 #include "pcqueue.h"
149
150 #include "mempool.h"
151
152 #if CMK_PERSISTENT_COMM
153 #include "machine-persistent.h"
154 #endif
155
156 //#define  USE_ONESIDED 1
157 #ifdef USE_ONESIDED
158 //onesided implementation is wrong, since no place to restore omdh
159 #include "onesided.h"
160 onesided_hnd_t   onesided_hnd;
161 onesided_md_t    omdh;
162 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
163
164 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
165
166 #else
167 uint8_t   onesided_hnd, omdh;
168 #if REMOTE_EVENT
169 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size>= MAX_REG_MEM) { \
170                                                                                          status = GNI_RC_ERROR_NOMEM;} \
171         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
172                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
173 #else
174 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
175       if(status == GNI_RC_SUCCESS) register_memory_size += size;
176 #endif
177 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size, status)  status = GNI_MemDeregister(nic_hndl, (mem_hndl)); \
178                                                                                 if( status == GNI_RC_SUCCESS) register_memory_size -= size; \
179                                                                                 else GNI_RC_CHECK(status, "deregister");
180 #endif
181
182 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
183 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
184 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
185                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
186 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
187                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
188 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
189 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
190 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
191 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
192 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
193 #define   GetSizeFromHeader(x) ((block_header*)x)->size
194 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
195 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
196 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
197 #define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
198 #define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
199 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
200
201 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
202 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
203 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
204 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
205
206 #define ALIGNBUF                64
207
208 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
209 /* If SMSG is not used */
210
211 #define FMA_PER_CORE  1024
212 #define FMA_BUFFER_SIZE 1024
213
214 /* If SMSG is used */
215 static int  SMSG_MAX_MSG = 1024;
216 #define SMSG_MAX_CREDIT 72 
217
218 #define MSGQ_MAXSIZE       2048
219 /* large message transfer with FMA or BTE */
220 #define LRTS_GNI_RDMA_THRESHOLD  1024 
221
222 #if CMK_SMP
223 static int  REMOTE_QUEUE_ENTRIES=163840; 
224 static int LOCAL_QUEUE_ENTRIES=163840; 
225 #else
226 static int  REMOTE_QUEUE_ENTRIES=20480;
227 static int LOCAL_QUEUE_ENTRIES=20480; 
228 #endif
229
230 #define BIG_MSG_TAG  0x26
231 #define PUT_DONE_TAG      0x28
232 #define DIRECT_PUT_DONE_TAG      0x29
233 #define ACK_TAG           0x30
234 /* SMSG is data message */
235 #define SMALL_DATA_TAG          0x31
236 /* SMSG is a control message to initialize a BTE */
237 #define MEDIUM_HEAD_TAG         0x32
238 #define MEDIUM_DATA_TAG         0x33
239 #define LMSG_INIT_TAG           0x39 
240 #define VERY_LMSG_INIT_TAG      0x40 
241 #define VERY_LMSG_TAG           0x41 
242
243 #define DEBUG
244 #ifdef GNI_RC_CHECK
245 #undef GNI_RC_CHECK
246 #endif
247 #ifdef DEBUG
248 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
249 #else
250 #define GNI_RC_CHECK(msg,rc)
251 #endif
252
253 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
254 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
255
256 static int useStaticMSGQ = 0;
257 static int useStaticFMA = 0;
258 static int mysize, myrank;
259 gni_nic_handle_t      nic_hndl;
260
261 typedef struct {
262     gni_mem_handle_t mdh;
263     uint64_t addr;
264 } mdh_addr_t ;
265 // this is related to dynamic SMSG
266
267 typedef struct mdh_addr_list{
268     gni_mem_handle_t mdh;
269    void *addr;
270     struct mdh_addr_list *next;
271 }mdh_addr_list_t;
272
273 static unsigned int         smsg_memlen;
274 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
275 mdh_addr_t          setup_mem;
276 mdh_addr_t          *smsg_connection_vec = 0;
277 gni_mem_handle_t    smsg_connection_memhndl;
278 static int          smsg_expand_slots = 10;
279 static int          smsg_available_slot = 0;
280 static void         *smsg_mailbox_mempool = 0;
281 mdh_addr_list_t     *smsg_dynamic_list = 0;
282
283 static void             *smsg_mailbox_base;
284 gni_msgq_attr_t         msgq_attrs;
285 gni_msgq_handle_t       msgq_handle;
286 gni_msgq_ep_attr_t      msgq_ep_attrs;
287 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
288
289 /* =====Beginning of Declarations of Machine Specific Variables===== */
290 static int cookie;
291 static int modes = 0;
292 static gni_cq_handle_t       smsg_rx_cqh = NULL;
293 static gni_cq_handle_t       smsg_tx_cqh = NULL;
294 static gni_cq_handle_t       post_rx_cqh = NULL;
295 static gni_cq_handle_t       post_tx_cqh = NULL;
296 static gni_ep_handle_t       *ep_hndl_array;
297 #if CMK_SMP && !COMM_THREAD_SEND
298 static CmiNodeLock           *ep_lock_array;
299 static CmiNodeLock           tx_cq_lock; 
300 static CmiNodeLock           rx_cq_lock; 
301 #endif
302 typedef struct msg_list
303 {
304     uint32_t destNode;
305     uint32_t size;
306     void *msg;
307     uint8_t tag;
308 #if !CMK_SMP
309     struct msg_list *next;
310 #endif
311 }MSG_LIST;
312
313
314 typedef struct control_msg
315 {
316     uint64_t            source_addr;    /* address from the start of buffer  */
317     uint64_t            dest_addr;      /* address from the start of buffer */
318     //int                 source;         /* source rank */
319     int                 total_length;   /* total length */
320     int                 length;         /* length of this packet */
321     uint8_t             seq_id;         //big message   0 meaning single message
322     gni_mem_handle_t    source_mem_hndl;
323     struct control_msg *next;
324 }CONTROL_MSG;
325
326 #ifdef CMK_DIRECT
327 typedef struct{
328     void *recverBuf;
329     void (*callbackFnPtr)(void *);
330     void *callbackData;
331 }CMK_DIRECT_HEADER;
332 #endif
333 typedef struct  rmda_msg
334 {
335     int                   destNode;
336     gni_post_descriptor_t *pd;
337 #if !CMK_SMP
338     struct  rmda_msg      *next;
339 #endif
340 }RDMA_REQUEST;
341
342 #if CMK_SMP
343 PCQueue sendRdmaBuf;
344 typedef struct  msg_list_index
345 {
346     int         next;
347     PCQueue     sendSmsgBuf;
348 } MSG_LIST_INDEX;
349 #else
350 static RDMA_REQUEST        *sendRdmaBuf = 0;
351 static RDMA_REQUEST        *sendRdmaTail = 0;
352 typedef struct  msg_list_index
353 {
354     int         next;
355     MSG_LIST    *sendSmsgBuf;
356     MSG_LIST    *tail;
357 } MSG_LIST_INDEX;
358 #endif
359
360 /* reuse PendingMsg memory */
361 static CONTROL_MSG          *control_freelist=0;
362 static MSG_LIST             *msglist_freelist=0;
363
364 typedef struct smsg_queue
365 {
366     MSG_LIST_INDEX   *smsg_msglist_index;
367     int               smsg_head_index;
368 } SMSG_QUEUE;
369
370 SMSG_QUEUE                  smsg_queue;
371 SMSG_QUEUE                  smsg_oob_queue;
372
373 #if CMK_SMP
374
375 #define FreeMsgList(d)   free(d);
376 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
377
378 #else
379
380 #define FreeMsgList(d)  \
381   (d)->next = msglist_freelist;\
382   msglist_freelist = d;
383
384 #define MallocMsgList(d) \
385   d = msglist_freelist;\
386   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
387              _MEMCHECK(d);\
388   } else msglist_freelist = d->next; \
389   d->next =0;
390 #endif
391
392 #if CMK_SMP
393
394 #define FreeControlMsg(d)      free(d);
395 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
396
397 #else
398
399 #define FreeControlMsg(d)       \
400   (d)->next = control_freelist;\
401   control_freelist = d;
402
403 #define MallocControlMsg(d) \
404   d = control_freelist;\
405   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
406              _MEMCHECK(d);\
407   } else control_freelist = d->next;
408
409 #endif
410
411 static RDMA_REQUEST         *rdma_freelist = NULL;
412
413 #define FreeMediumControlMsg(d)       \
414   (d)->next = medium_control_freelist;\
415   medium_control_freelist = d;
416
417
418 #define MallocMediumControlMsg(d) \
419     d = medium_control_freelist;\
420     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
421     _MEMCHECK(d);\
422 } else mediumcontrol_freelist = d->next;
423
424 # if CMK_SMP
425 #define FreeRdmaRequest(d)       free(d);
426 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
427 #else
428
429 #define FreeRdmaRequest(d)       \
430   (d)->next = rdma_freelist;\
431   rdma_freelist = d;
432
433 #define MallocRdmaRequest(d) \
434   d = rdma_freelist;\
435   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
436              _MEMCHECK(d);\
437   } else rdma_freelist = d->next; \
438     d->next =0;
439 #endif
440
441 /* reuse gni_post_descriptor_t */
442 static gni_post_descriptor_t *post_freelist=0;
443
444 #if  !CMK_SMP
445 #define FreePostDesc(d)       \
446     (d)->next_descr = post_freelist;\
447     post_freelist = d;
448
449 #define MallocPostDesc(d) \
450   d = post_freelist;\
451   if (d==0) { \
452      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
453      _MEMCHECK(d);\
454   } else post_freelist = d->next_descr;
455 #else
456
457 #define FreePostDesc(d)     free(d);
458 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
459
460 #endif
461
462
463 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
464 static int      buffered_smsg_counter = 0;
465
466 /* SmsgSend return success but message sent is not confirmed by remote side */
467 static MSG_LIST *buffered_fma_head = 0;
468 static MSG_LIST *buffered_fma_tail = 0;
469
470 /* functions  */
471 #define IsFree(a,ind)  !( a& (1<<(ind) ))
472 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
473 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
474
475 CpvDeclare(mempool_type*, mempool);
476
477 /* get the upper bound of log 2 */
478 int mylog2(int size)
479 {
480     int op = size;
481     unsigned int ret=0;
482     unsigned int mask = 0;
483     int i;
484     while(op>0)
485     {
486         op = op >> 1;
487         ret++;
488
489     }
490     for(i=1; i<ret; i++)
491     {
492         mask = mask << 1;
493         mask +=1;
494     }
495
496     ret -= ((size &mask) ? 0:1);
497     return ret;
498 }
499
500 static void
501 allgather(void *in,void *out, int len)
502 {
503     static int *ivec_ptr=NULL,already_called=0,job_size=0;
504     int i,rc;
505     int my_rank;
506     char *tmp_buf,*out_ptr;
507
508     if(!already_called) {
509
510         rc = PMI_Get_size(&job_size);
511         CmiAssert(rc == PMI_SUCCESS);
512         rc = PMI_Get_rank(&my_rank);
513         CmiAssert(rc == PMI_SUCCESS);
514
515         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
516         CmiAssert(ivec_ptr != NULL);
517
518         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
519         CmiAssert(rc == PMI_SUCCESS);
520
521         already_called = 1;
522
523     }
524
525     tmp_buf = (char *)malloc(job_size * len);
526     CmiAssert(tmp_buf);
527
528     rc = PMI_Allgather(in,tmp_buf,len);
529     CmiAssert(rc == PMI_SUCCESS);
530
531     out_ptr = out;
532
533     for(i=0;i<job_size;i++) {
534
535         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
536
537     }
538
539     free(tmp_buf);
540 }
541
542 static void
543 allgather_2(void *in,void *out, int len)
544 {
545     //PMI_Allgather is out of order
546     int i,rc, extend_len;
547     int  rank_index;
548     char *out_ptr, *out_ref;
549     char *in2;
550
551     extend_len = sizeof(int) + len;
552     in2 = (char*)malloc(extend_len);
553
554     memcpy(in2, &myrank, sizeof(int));
555     memcpy(in2+sizeof(int), in, len);
556
557     out_ptr = (char*)malloc(mysize*extend_len);
558
559     rc = PMI_Allgather(in2, out_ptr, extend_len);
560     GNI_RC_CHECK("allgather", rc);
561
562     out_ref = out;
563
564     for(i=0;i<mysize;i++) {
565         //rank index 
566         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
567         //copy to the rank index slot
568         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
569     }
570
571     free(out_ptr);
572     free(in2);
573
574 }
575
576 static unsigned int get_gni_nic_address(int device_id)
577 {
578     unsigned int address, cpu_id;
579     gni_return_t status;
580     int i, alps_dev_id=-1,alps_address=-1;
581     char *token, *p_ptr;
582
583     p_ptr = getenv("PMI_GNI_DEV_ID");
584     if (!p_ptr) {
585         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
586        
587         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
588     } else {
589         while ((token = strtok(p_ptr,":")) != NULL) {
590             alps_dev_id = atoi(token);
591             if (alps_dev_id == device_id) {
592                 break;
593             }
594             p_ptr = NULL;
595         }
596         CmiAssert(alps_dev_id != -1);
597         p_ptr = getenv("PMI_GNI_LOC_ADDR");
598         CmiAssert(p_ptr != NULL);
599         i = 0;
600         while ((token = strtok(p_ptr,":")) != NULL) {
601             if (i == alps_dev_id) {
602                 alps_address = atoi(token);
603                 break;
604             }
605             p_ptr = NULL;
606             ++i;
607         }
608         CmiAssert(alps_address != -1);
609         address = alps_address;
610     }
611     return address;
612 }
613
614 static uint8_t get_ptag(void)
615 {
616     char *p_ptr, *token;
617     uint8_t ptag;
618
619     p_ptr = getenv("PMI_GNI_PTAG");
620     CmiAssert(p_ptr != NULL);
621     token = strtok(p_ptr, ":");
622     ptag = (uint8_t)atoi(token);
623     return ptag;
624         
625 }
626
627 static uint32_t get_cookie(void)
628 {
629     uint32_t cookie;
630     char *p_ptr, *token;
631
632     p_ptr = getenv("PMI_GNI_COOKIE");
633     CmiAssert(p_ptr != NULL);
634     token = strtok(p_ptr, ":");
635     cookie = (uint32_t)atoi(token);
636
637     return cookie;
638 }
639
640 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
641 /* TODO: add any that are related */
642 /* =====End of Definitions of Message-Corruption Related Macros=====*/
643
644
645 #include "machine-lrts.h"
646 #include "machine-common-core.c"
647
648 /* Network progress function is used to poll the network when for
649    messages. This flushes receive buffers on some  implementations*/
650 #if CMK_MACHINE_PROGRESS_DEFINED
651 void CmiMachineProgressImpl() {
652 }
653 #endif
654
655 static void SendRdmaMsg();
656 static void PumpNetworkSmsg();
657 static void PumpLocalRdmaTransactions();
658 static int SendBufferMsg();
659 static     int register_memory_size = 0;
660
661 #if MACHINE_DEBUG_LOG
662 FILE *debugLog = NULL;
663 static CmiInt8 buffered_recv_msg = 0;
664 int         lrts_smsg_success = 0;
665 int         lrts_received_msg = 0;
666 #endif
667
668 static void sweep_mempool(mempool_type *mptr)
669 {
670     block_header *current = &(mptr->block_head);
671
672 printf("[n %d] sweep_mempool slot START .\n", myrank);
673     while( current!= NULL) {
674 printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
675         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
676     }
677 printf("[n %d] sweep_mempool slot END .\n", myrank);
678 }
679
680 inline
681 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
682 {
683     gni_return_t status;
684     block_header *current = *from;
685
686     while(register_memory_size>= MAX_REG_MEM)
687     {
688         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
689             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
690
691         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
692         *from = current;
693         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current), status)
694         SetMemHndlZero(GetMemHndlFromHeader(current));
695     }
696     return GNI_RC_SUCCESS;
697 }
698
699 inline 
700 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
701 {
702     gni_return_t status = GNI_RC_SUCCESS;
703     //int size = GetMempoolsize(msg);
704     //void *blockaddr = GetMempoolBlockPtr(msg);
705     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
706    
707     block_header *current = &(mptr->block_head);
708
709     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
710     while(1)
711     {
712         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status)
713         if(status == GNI_RC_SUCCESS)
714         {
715             break;
716         }
717         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
718         {
719             CmiAbort("Memory registor for mempool fails\n");
720         }
721         else
722         {
723             status = deregisterMemory(mptr, &current);
724             if (status != GNI_RC_SUCCESS) break;
725         }
726     }; 
727     return status;
728 }
729
730 inline 
731 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
732 {
733     static int rank = -1;
734     int i;
735     gni_return_t status;
736     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
737     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
738     mempool_type *mptr;
739
740     status = registerFromMempool(mptr1, msg, size, t);
741     if (status == GNI_RC_SUCCESS) return status;
742 #if CMK_SMP
743     for (i=0; i<CmiMyNodeSize()+1; i++) {
744       //rank = (rank+1)%(CmiMyNodeSize()+1);
745       mptr = CpvAccessOther(mempool, i);
746       if (mptr == mptr1) continue;
747       status = registerFromMempool(mptr, msg, size, t);
748       if (status == GNI_RC_SUCCESS) return status;
749     }
750 #endif
751     return  GNI_RC_ERROR_RESOURCE;
752 }
753
754 inline
755 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
756 {
757     MSG_LIST        *msg_tmp;
758     MallocMsgList(msg_tmp);
759     msg_tmp->destNode = destNode;
760     msg_tmp->size   = size;
761     msg_tmp->msg    = msg;
762     msg_tmp->tag    = tag;
763
764 #if !CMK_SMP
765     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
766         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
767         queue->smsg_head_index = destNode;
768         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
769     }else
770     {
771         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
772         queue->smsg_msglist_index[destNode].tail = msg_tmp;
773     }
774 #else
775     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
776 #endif
777 #if PRINT_SYH
778     buffered_smsg_counter++;
779 #endif
780 }
781
782 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
783 {
784     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
785 }
786
787 inline
788 static void setup_smsg_connection(int destNode)
789 {
790     mdh_addr_list_t  *new_entry = 0;
791     gni_post_descriptor_t *pd;
792     gni_smsg_attr_t      *smsg_attr;
793     gni_return_t status = GNI_RC_NOT_DONE;
794     RDMA_REQUEST        *rdma_request_msg;
795     
796     if(smsg_available_slot == smsg_expand_slots)
797     {
798         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
799         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
800         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
801
802         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
803             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
804             GNI_MEM_READWRITE,   
805             -1,
806             &(new_entry->mdh));
807         smsg_available_slot = 0; 
808         new_entry->next = smsg_dynamic_list;
809         smsg_dynamic_list = new_entry;
810     }
811     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
812     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
813     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
814     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
815     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
816     smsg_attr->buff_size = smsg_memlen;
817     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
818     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
819     smsg_local_attr_vec[destNode] = smsg_attr;
820     smsg_available_slot++;
821     MallocPostDesc(pd);
822     pd->type            = GNI_POST_FMA_PUT;
823     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
824     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
825     pd->length          = sizeof(gni_smsg_attr_t);
826     pd->local_addr      = (uint64_t) smsg_attr;
827     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
828     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
829     pd->src_cq_hndl     = 0;
830     pd->rdma_mode       = 0;
831     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
832     print_smsg_attr(smsg_attr);
833     if(status == GNI_RC_ERROR_RESOURCE )
834     {
835         MallocRdmaRequest(rdma_request_msg);
836         rdma_request_msg->destNode = destNode;
837         rdma_request_msg->pd = pd;
838         /* buffer this request */
839     }
840 #if PRINT_SYH
841     if(status != GNI_RC_SUCCESS)
842        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
843     else
844         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
845 #endif
846 }
847
848 /* useDynamicSMSG */
849 inline 
850 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
851 {
852     gni_return_t status = GNI_RC_NOT_DONE;
853
854     if(mailbox_list->offset == mailbox_list->size)
855     {
856         dynamic_smsg_mailbox_t *new_mailbox_entry;
857         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
858         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
859         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
860         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
861         new_mailbox_entry->offset = 0;
862         
863         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
864             new_mailbox_entry->size, smsg_rx_cqh,
865             GNI_MEM_READWRITE,   
866             -1,
867             &(new_mailbox_entry->mem_hndl));
868
869         GNI_RC_CHECK("register", status);
870         new_mailbox_entry->next = mailbox_list;
871         mailbox_list = new_mailbox_entry;
872     }
873     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
874     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
875     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
876     local_smsg_attr->mbox_offset = mailbox_list->offset;
877     mailbox_list->offset += smsg_memlen;
878     local_smsg_attr->buff_size = smsg_memlen;
879     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
880     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
881 }
882
883 /* useDynamicSMSG */
884 inline 
885 static int connect_to(int destNode)
886 {
887     gni_return_t status = GNI_RC_NOT_DONE;
888     CmiAssert(smsg_connected_flag[destNode] == 0);
889     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
890     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
891     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
892     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
893             
894 #if CMK_SMP && !COMM_THREAD_SEND
895     //CmiLock(ep_lock_array[destNode]);
896     CmiLock(tx_cq_lock);
897 #endif
898     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
899 #if CMK_SMP && !COMM_THREAD_SEND
900     //CmiUnlock(ep_lock_array[destNode]);
901     CmiUnlock(tx_cq_lock);
902 #endif
903     if (status == GNI_RC_ERROR_RESOURCE) {
904       /* possibly destNode is making connection at the same time */
905       free(smsg_attr_vector_local[destNode]);
906       smsg_attr_vector_local[destNode] = NULL;
907       free(smsg_attr_vector_remote[destNode]);
908       smsg_attr_vector_remote[destNode] = NULL;
909       mailbox_list->offset -= smsg_memlen;
910       return 0;
911     }
912     GNI_RC_CHECK("GNI_Post", status);
913     smsg_connected_flag[destNode] = 1;
914     return 1;
915 }
916
917 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
918 inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
919 {
920     unsigned int          remote_address;
921     uint32_t              remote_id;
922     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
923     gni_smsg_attr_t       *smsg_attr;
924     gni_post_descriptor_t *pd;
925     gni_post_state_t      post_state;
926     char                  *real_data; 
927
928     if (useDynamicSMSG) {
929         switch (smsg_connected_flag[destNode]) {
930         case 0: 
931             connect_to(destNode);         /* continue to case 1 */
932         case 1:                           /* pending connection, do nothing */
933             status = GNI_RC_NOT_DONE;
934             if(inbuff ==0)
935                 buffer_small_msgs(queue, msg, size, destNode, tag);
936             return status;
937         }
938     }
939 #if CMK_SMP
940     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
941     {
942 #else
943     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
944     {
945 #endif
946 #if CMK_SMP && !COMM_THREAD_SEND
947         CmiLock(tx_cq_lock);
948 #endif
949         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
950 #if CMK_SMP && !COMM_THREAD_SEND
951         CmiUnlock(tx_cq_lock);
952 #endif
953         if(status == GNI_RC_SUCCESS)
954         {
955 #if CMK_SMP_TRACE_COMMTHREAD
956             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
957             { 
958                 START_EVENT();
959                 if ( tag == SMALL_DATA_TAG)
960                     real_data = (char*)msg; 
961                 else 
962                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
963                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
964             }
965 #endif
966             smsg_send_count ++;
967             return status;
968         }else
969             status = GNI_RC_ERROR_RESOURCE;
970     }
971     if(inbuff ==0)
972         buffer_small_msgs(queue, msg, size, destNode, tag);
973     return status;
974 }
975
976
977 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
978 {
979     /* construct a control message and send */
980     CONTROL_MSG         *control_msg_tmp;
981     MallocControlMsg(control_msg_tmp);
982     control_msg_tmp->source_addr = (uint64_t)msg;
983     control_msg_tmp->seq_id    = seqno;
984     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
985 #if     USE_LRTS_MEMPOOL
986     if(size < BIG_MSG)
987     {
988         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
989     }
990     else
991     {
992         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
993         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
994         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
995     }
996 #else
997     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
998 #endif
999     return control_msg_tmp;
1000 }
1001
1002 // Large message, send control to receiver, receiver register memory and do a GET, 
1003 // return 1 - send no success
1004 inline
1005 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1006 {
1007     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1008     uint32_t            vmdh_index  = -1;
1009     int                 size;
1010     int                 offset = 0;
1011     uint64_t            source_addr;
1012     int                 register_size; 
1013     void                *msg;
1014
1015     size    =   control_msg_tmp->total_length;
1016     source_addr = control_msg_tmp->source_addr;
1017     register_size = control_msg_tmp->length;
1018
1019 #if     USE_LRTS_MEMPOOL
1020     if(buffered_send_msg >= MAX_BUFF_SEND)
1021     {
1022         if(!inbuff)
1023             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1024         return GNI_RC_ERROR_NOMEM;
1025     }
1026     if( control_msg_tmp->seq_id == 0 ){
1027         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1028         {
1029             //register the corresponding mempool
1030             msg = (void*)source_addr;
1031             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1032             if(status == GNI_RC_SUCCESS)
1033             {
1034                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1035             }
1036         }else
1037         {
1038             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1039             status = GNI_RC_SUCCESS;
1040         }
1041         if(NoMsgInSend( control_msg_tmp->source_addr))
1042             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1043         else
1044             register_size = 0;
1045     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1046     {
1047         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1048         source_addr += offset;
1049         size = control_msg_tmp->length;
1050         status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl)); 
1051         //MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1052         register_size = ALIGN64(size);  
1053     }
1054
1055     if(status == GNI_RC_SUCCESS)
1056     {
1057         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1058         if(status == GNI_RC_SUCCESS)
1059         {
1060             buffered_send_msg += register_size;
1061             if(control_msg_tmp->seq_id == 0)
1062             {
1063                 IncreaseMsgInSend(source_addr);
1064             }
1065             FreeControlMsg(control_msg_tmp);
1066             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1067         }else
1068             status = GNI_RC_ERROR_RESOURCE;
1069
1070     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1071     {
1072         CmiAbort("Memory registor for large msg\n");
1073     }else 
1074     {
1075         status = GNI_RC_ERROR_NOMEM; 
1076         if(!inbuff)
1077             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1078     }
1079     return status;
1080 #else
1081     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1082     if(status == GNI_RC_SUCCESS)
1083     {
1084         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1085         if(status == GNI_RC_SUCCESS)
1086         {
1087             FreeControlMsg(control_msg_tmp);
1088         }
1089     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1090     {
1091         CmiAbort("Memory registor for large msg\n");
1092     }else 
1093     {
1094         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1095     }
1096     return status;
1097 #endif
1098 }
1099
1100 inline void LrtsPrepareEnvelope(char *msg, int size)
1101 {
1102     CmiSetMsgSize(msg, size);
1103 }
1104
1105 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1106 {
1107     gni_return_t        status  =   GNI_RC_SUCCESS;
1108     uint8_t tag;
1109     CONTROL_MSG         *control_msg_tmp;
1110     int                 oob = ( mode & OUT_OF_BAND);
1111     SMSG_QUEUE          *queue;
1112
1113     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1114 #if CMK_USE_OOB
1115     queue = oob? &smsg_oob_queue : &smsg_queue;
1116 #else
1117     queue = &smsg_queue;
1118 #endif
1119
1120     LrtsPrepareEnvelope(msg, size);
1121
1122 #if PRINT_SYH
1123     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1124 #endif 
1125 #if CMK_SMP && COMM_THREAD_SEND
1126     if(size <= SMSG_MAX_MSG)
1127         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1128     else if (size < BIG_MSG) {
1129         control_msg_tmp =  construct_control_msg(size, msg, 0);
1130         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1131     }
1132     else {
1133           CmiSetMsgSeq(msg, 0);
1134           control_msg_tmp =  construct_control_msg(size, msg, 1);
1135           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1136     }
1137 #else //non-smp, smp(worker sending)
1138     if(size <= SMSG_MAX_MSG)
1139     {
1140         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1141             CmiFree(msg);
1142     }
1143     else if (size < BIG_MSG) {
1144         control_msg_tmp =  construct_control_msg(size, msg, 0);
1145         send_large_messages(queue, destNode, control_msg_tmp, 0);
1146     }
1147     else {
1148 #if     USE_LRTS_MEMPOOL
1149         CmiSetMsgSeq(msg, 0);
1150         control_msg_tmp =  construct_control_msg(size, msg, 1);
1151         send_large_messages(queue, destNode, control_msg_tmp, 0);
1152 #else
1153         control_msg_tmp =  construct_control_msg(size, msg, 0);
1154         send_large_messages(queue, destNode, control_msg_tmp, 0);
1155 #endif
1156     }
1157 #endif
1158     return 0;
1159 }
1160
1161 static void    PumpDatagramConnection();
1162 static void registerUserTraceEvents() {
1163 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1164     traceRegisterUserEvent("setting up connections", 10);
1165     traceRegisterUserEvent("Receiving small msgs", 20);
1166     traceRegisterUserEvent("Release local transaction", 30);
1167     traceRegisterUserEvent("Sending buffered small msgs", 40);
1168     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1169 #endif
1170 }
1171
1172 static void ProcessDeadlock()
1173 {
1174     static CmiUInt8 *ptr = NULL;
1175     static CmiUInt8  last = 0, mysum, sum;
1176     static int count = 0;
1177     gni_return_t status;
1178     int i;
1179
1180 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1181     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1182     mysum = smsg_send_count + smsg_recv_count;
1183     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1184     GNI_RC_CHECK("PMI_Allgather", status);
1185     sum = 0;
1186     for (i=0; i<mysize; i++)  sum+= ptr[i];
1187     if (last == 0 || sum == last) 
1188         count++;
1189     else
1190         count = 0;
1191     last = sum;
1192     if (count == 2) { 
1193         /* detected twice, it is a real deadlock */
1194         if (myrank == 0)  {
1195             CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage.\n");
1196             CmiAbort("Fatal> Deadlock detected.");
1197         }
1198     }
1199     _detected_hang = 0;
1200 }
1201
1202 static void CheckProgress()
1203 {
1204     if (smsg_send_count == last_smsg_send_count &&
1205         smsg_recv_count == last_smsg_recv_count ) 
1206     {
1207         _detected_hang = 1;
1208 #if !CMK_SMP
1209         if (_detected_hang) ProcessDeadlock();
1210 #endif
1211     }
1212     else {
1213         last_smsg_send_count = smsg_send_count;
1214         last_smsg_recv_count = smsg_recv_count;
1215     }
1216 }
1217
1218 void LrtsPostCommonInit(int everReturn)
1219 {
1220 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1221     CpvInitialize(double, projTraceStart);
1222     /* only PE 0 needs to care about registration (to generate sts file). */
1223     if (CmiMyPe() == 0) {
1224         registerMachineUserEventsFunction(&registerUserTraceEvents);
1225     }
1226 #endif
1227
1228 #if CMK_SMP
1229     CmiIdleState *s=CmiNotifyGetState();
1230     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1231     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1232 #else
1233     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1234     if (useDynamicSMSG)
1235     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1236 #endif
1237
1238     if (_checkProgress)
1239 #if CMK_SMP
1240     if (CmiMyRank() == 0)
1241 #endif
1242     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1243 }
1244
1245 /* this is called by worker thread */
1246 void LrtsPostNonLocal(){
1247 #if CMK_SMP
1248 #if !COMM_THREAD_SEND
1249     if(mysize == 1) return;
1250     PumpLocalRdmaTransactions();
1251     SendBufferMsg();
1252     SendRdmaMsg();
1253 #endif
1254 #endif
1255 }
1256
1257 /* useDynamicSMSG */
1258 static void    PumpDatagramConnection()
1259 {
1260     uint32_t          remote_address;
1261     uint32_t          remote_id;
1262     gni_return_t status;
1263     gni_post_state_t  post_state;
1264     uint64_t          datagram_id;
1265     int i;
1266
1267    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1268    {
1269        if (datagram_id >= mysize) {           /* bound endpoint */
1270            int pe = datagram_id - mysize;
1271 #if CMK_SMP && !COMM_THREAD_SEND
1272            CmiLock(tx_cq_lock);
1273 #endif
1274            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1275 #if CMK_SMP && !COMM_THREAD_SEND
1276            CmiUnlock(tx_cq_lock);
1277 #endif
1278            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1279            {
1280                CmiAssert(remote_id == pe);
1281                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1282                GNI_RC_CHECK("Dynamic SMSG Init", status);
1283 #if PRINT_SYH
1284                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1285 #endif
1286                CmiAssert(smsg_connected_flag[pe] == 1);
1287                smsg_connected_flag[pe] = 2;
1288            }
1289        }
1290        else {         /* unbound ep */
1291            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1292            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1293            {
1294                CmiAssert(remote_id<mysize);
1295                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1296                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1297                GNI_RC_CHECK("Dynamic SMSG Init", status);
1298 #if PRINT_SYH
1299                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1300 #endif
1301                smsg_connected_flag[remote_id] = 2;
1302
1303                alloc_smsg_attr(&send_smsg_attr);
1304                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1305                GNI_RC_CHECK("post unbound datagram", status);
1306            }
1307        }
1308    }
1309 }
1310
1311 /* pooling CQ to receive network message */
1312 static void PumpNetworkRdmaMsgs()
1313 {
1314     gni_cq_entry_t      event_data;
1315     gni_return_t        status;
1316
1317 }
1318
1319 inline 
1320 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1321 {
1322     RDMA_REQUEST        *rdma_request_msg;
1323     MallocRdmaRequest(rdma_request_msg);
1324     rdma_request_msg->destNode = inst_id;
1325     rdma_request_msg->pd = pd;
1326 #if CMK_SMP
1327     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1328 #else
1329     if(sendRdmaBuf == 0)
1330     {
1331         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1332     }else{
1333         sendRdmaTail->next = rdma_request_msg;
1334         sendRdmaTail =  rdma_request_msg;
1335     }
1336 #endif
1337
1338 }
1339 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1340 static void PumpNetworkSmsg()
1341 {
1342     uint64_t            inst_id;
1343     int                 ret;
1344     gni_cq_entry_t      event_data;
1345     gni_return_t        status;
1346     void                *header;
1347     uint8_t             msg_tag;
1348     int                 msg_nbytes;
1349     void                *msg_data;
1350     gni_mem_handle_t    msg_mem_hndl;
1351     gni_smsg_attr_t     *smsg_attr;
1352     gni_smsg_attr_t     *remote_smsg_attr;
1353     int                 init_flag;
1354     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1355     uint64_t            source_addr;
1356     SMSG_QUEUE         *queue = &smsg_queue;
1357
1358 #if CMK_SMP && !COMM_THREAD_SEND
1359     while(1)
1360     {
1361         CmiLock(tx_cq_lock);
1362         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1363         CmiUnlock(tx_cq_lock);
1364         if(status != GNI_RC_SUCCESS)
1365             break;
1366 #else
1367     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1368     {
1369 #endif
1370         inst_id = GNI_CQ_GET_INST_ID(event_data);
1371         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1372 #if PRINT_SYH
1373         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1374 #endif
1375         if (useDynamicSMSG) {
1376             /* subtle: smsg may come before connection is setup */
1377             while (smsg_connected_flag[inst_id] != 2) 
1378                PumpDatagramConnection();
1379         }
1380         msg_tag = GNI_SMSG_ANY_TAG;
1381 #if CMK_SMP && !COMM_THREAD_SEND
1382         while(1) {
1383             CmiLock(tx_cq_lock);
1384             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1385             CmiUnlock(tx_cq_lock);
1386             if (status != GNI_RC_SUCCESS)
1387                 break;
1388 #else
1389         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1390 #endif
1391             /* copy msg out and then put into queue (small message) */
1392             switch (msg_tag) {
1393             case SMALL_DATA_TAG:
1394             {
1395                 START_EVENT();
1396                 msg_nbytes = CmiGetMsgSize(header);
1397                 msg_data    = CmiAlloc(msg_nbytes);
1398                 memcpy(msg_data, (char*)header, msg_nbytes);
1399 #if CMK_SMP_TRACE_COMMTHREAD
1400                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1401 #endif
1402                 handleOneRecvedMsg(msg_nbytes, msg_data);
1403                 break;
1404             }
1405             case LMSG_INIT_TAG:
1406             {
1407 #if PRINT_SYH
1408                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1409 #endif
1410                 getLargeMsgRequest(header, inst_id);
1411                 break;
1412             }
1413             case ACK_TAG:   //msg fit into mempool
1414             {
1415                 /* Get is done, release message . Now put is not used yet*/
1416                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1417 #if ! USE_LRTS_MEMPOOL
1418                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1419 #else
1420                 DecreaseMsgInSend(msg);
1421 #endif
1422                 if(NoMsgInSend(msg))
1423                     buffered_send_msg -= GetMempoolsize(msg);
1424                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1425                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1426                 break;
1427             }
1428             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1429             {
1430                 header_tmp = (CONTROL_MSG *) header;
1431                 void *msg = (void*)(header_tmp->source_addr);
1432                 int cur_seq = CmiGetMsgSeq(msg);
1433                 int offset = ONE_SEG*(cur_seq+1);
1434                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1435                 buffered_send_msg -= header_tmp->length;
1436                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1437                 if (remain_size < 0) remain_size = 0;
1438                 CmiSetMsgSize(msg, remain_size);
1439                 if(remain_size <= 0) //transaction done
1440                 {
1441                     CmiFree(msg);
1442                 }else if (header_tmp->total_length > offset)
1443                 {
1444                     CmiSetMsgSeq(msg, cur_seq+1);
1445                     MallocControlMsg(control_msg_tmp);
1446                     control_msg_tmp->source_addr = header_tmp->source_addr;
1447                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1448                     control_msg_tmp->total_length   = header_tmp->total_length; 
1449                     control_msg_tmp->length         = header_tmp->total_length - offset;
1450                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1451                     control_msg_tmp->seq_id         = cur_seq+1+1;
1452                     //send next seg
1453                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1454                          // pipelining
1455                     if (header_tmp->seq_id == 1) {
1456                       int i;
1457                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1458                         int seq = cur_seq+i+2;
1459                         CmiSetMsgSeq(msg, seq-1);
1460                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1461                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1462                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1463                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1464                       }
1465                     }
1466                 }
1467                 break;
1468             }
1469 #if CMK_PERSISTENT_COMM
1470             case PUT_DONE_TAG: //persistent message
1471                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1472                 int size = ((CONTROL_MSG *) header)->length;
1473                 CmiReference(msg);
1474                 handleOneRecvedMsg(size, msg); 
1475                 break;
1476 #endif
1477 #ifdef CMK_DIRECT
1478             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1479                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1480                 break;
1481 #endif
1482             default: {
1483                 printf("weird tag problem\n");
1484                 CmiAbort("Unknown tag\n");
1485                      }
1486             }               // end switch
1487 #if CMK_SMP && !COMM_THREAD_SEND
1488             CmiLock(tx_cq_lock);
1489 #endif
1490             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1491 #if CMK_SMP && !COMM_THREAD_SEND
1492             CmiUnlock(tx_cq_lock);
1493 #endif
1494             smsg_recv_count ++;
1495             msg_tag = GNI_SMSG_ANY_TAG;
1496         } //endwhile getNext
1497     }   //end while GetEvent
1498     if(status == GNI_RC_ERROR_RESOURCE)
1499     {
1500         GNI_RC_CHECK("Smsg_rx_cq full", status);
1501     }
1502 }
1503
1504 static void printDesc(gni_post_descriptor_t *pd)
1505 {
1506     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1507 }
1508
1509 // for BIG_MSG called on receiver side for receiving control message
1510 // LMSG_INIT_TAG
1511 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1512 {
1513 #if     USE_LRTS_MEMPOOL
1514     CONTROL_MSG         *request_msg;
1515     gni_return_t        status = GNI_RC_SUCCESS;
1516     void                *msg_data;
1517     gni_post_descriptor_t *pd;
1518     gni_mem_handle_t    msg_mem_hndl;
1519     int source, size, transaction_size, offset = 0;
1520     int     register_size = 0;
1521
1522     // initial a get to transfer data from the sender side */
1523     request_msg = (CONTROL_MSG *) header;
1524     size = request_msg->total_length;
1525     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1526     if(request_msg->seq_id < 2)   {
1527         msg_data = CmiAlloc(size);
1528         CmiSetMsgSeq(msg_data, 0);
1529         _MEMCHECK(msg_data);
1530     }
1531     else {
1532         offset = ONE_SEG*(request_msg->seq_id-1);
1533         msg_data = (char*)request_msg->dest_addr + offset;
1534     }
1535    
1536     MallocPostDesc(pd);
1537     pd->cqwrite_value = request_msg->seq_id;
1538     if( request_msg->seq_id == 0)
1539     {
1540         pd->local_mem_hndl= GetMemHndl(msg_data);
1541         transaction_size = ALIGN64(size);
1542         if(IsMemHndlZero(pd->local_mem_hndl))
1543         {   
1544             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1545             if(status == GNI_RC_SUCCESS)
1546             {
1547                 pd->local_mem_hndl = GetMemHndl(msg_data);
1548             }
1549             else
1550             {
1551                 SetMemHndlZero(pd->local_mem_hndl);
1552             }
1553         }
1554         if(NoMsgInRecv( (void*)(msg_data)))
1555             register_size = GetMempoolsize((void*)(msg_data));
1556         else
1557             register_size = 0;
1558     }
1559     else{
1560         transaction_size = ALIGN64(request_msg->length);
1561         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1562         //MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh, status)
1563         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1564         {
1565             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1566         }
1567     }
1568     pd->first_operand = ALIGN64(size);                   //  total length
1569
1570     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1571         pd->type            = GNI_POST_FMA_GET;
1572     else
1573         pd->type            = GNI_POST_RDMA_GET;
1574 #if REMOTE_EVENT
1575     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1576 #else
1577     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1578 #endif
1579     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1580     pd->length          = transaction_size;
1581     pd->local_addr      = (uint64_t) msg_data;
1582     pd->remote_addr     = request_msg->source_addr + offset;
1583     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1584     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1585     pd->rdma_mode       = 0;
1586     pd->amo_cmd         = 0;
1587
1588     //memory registration success
1589     if(status == GNI_RC_SUCCESS)
1590     {
1591 #if CMK_SMP && !COMM_THREAD_SEND
1592         CmiLock(tx_cq_lock);
1593 #endif
1594         if(pd->type == GNI_POST_RDMA_GET) 
1595             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1596         else
1597             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1598 #if CMK_SMP && !COMM_THREAD_SEND
1599         CmiUnlock(tx_cq_lock);
1600 #endif
1601          
1602         if(status == GNI_RC_SUCCESS )
1603         {
1604             if(pd->cqwrite_value == 0)
1605             {
1606 #if MACHINE_DEBUG_LOG
1607                 buffered_recv_msg += register_size;
1608                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1609 #endif
1610                 IncreaseMsgInRecv(msg_data);
1611
1612             }
1613         }
1614     }else
1615     {
1616         SetMemHndlZero(pd->local_mem_hndl);
1617     }
1618     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1619     {
1620         bufferRdmaMsg(inst_id, pd); 
1621     }else {
1622         /* printf("source: %d pd:%p\n", source, pd); */
1623         GNI_RC_CHECK("AFter posting", status);
1624     }
1625 #else
1626     CONTROL_MSG         *request_msg;
1627     gni_return_t        status;
1628     void                *msg_data;
1629     gni_post_descriptor_t *pd;
1630     RDMA_REQUEST        *rdma_request_msg;
1631     gni_mem_handle_t    msg_mem_hndl;
1632     //int source;
1633     // initial a get to transfer data from the sender side */
1634     request_msg = (CONTROL_MSG *) header;
1635     msg_data = CmiAlloc(request_msg->length);
1636     _MEMCHECK(msg_data);
1637
1638     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1639
1640     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1641     {
1642         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1643     }
1644
1645     MallocPostDesc(pd);
1646     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1647         pd->type            = GNI_POST_FMA_GET;
1648     else
1649         pd->type            = GNI_POST_RDMA_GET;
1650 #if REMOTE_EVENT
1651     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1652 #else
1653     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1654 #endif
1655     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1656     pd->length          = ALIGN64(request_msg->length);
1657     pd->local_addr      = (uint64_t) msg_data;
1658     pd->remote_addr     = request_msg->source_addr;
1659     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1660     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1661     pd->rdma_mode       = 0;
1662     pd->amo_cmd         = 0;
1663
1664     //memory registration successful
1665     if(status == GNI_RC_SUCCESS)
1666     {
1667         pd->local_mem_hndl  = msg_mem_hndl;
1668 #if CMK_SMP && !COMM_THREAD_SEND
1669             CmiLock(tx_cq_lock);
1670 #endif
1671         if(pd->type == GNI_POST_RDMA_GET) 
1672             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1673         else
1674             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1675 #if CMK_SMP && !COMM_THREAD_SEND
1676             CmiUnlock(tx_cq_lock);
1677 #endif
1678     }else
1679     {
1680         SetMemHndlZero(pd->local_mem_hndl);
1681     }
1682     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1683     {
1684         MallocRdmaRequest(rdma_request_msg);
1685         rdma_request_msg->next = 0;
1686         rdma_request_msg->destNode = inst_id;
1687         rdma_request_msg->pd = pd;
1688         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1689     }else {
1690         GNI_RC_CHECK("AFter posting", status);
1691     }
1692 #endif
1693 }
1694
1695 static void PumpLocalRdmaTransactions()
1696 {
1697     gni_cq_entry_t          ev;
1698     gni_return_t            status;
1699     uint64_t                type, inst_id;
1700     gni_post_descriptor_t   *tmp_pd;
1701     MSG_LIST                *ptr;
1702     CONTROL_MSG             *ack_msg_tmp;
1703     uint8_t             msg_tag;
1704 #ifdef CMK_DIRECT
1705     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1706 #endif
1707     SMSG_QUEUE         *queue = &smsg_queue;
1708
1709 #if CMK_SMP && !COMM_THREAD_SEND
1710     while(1) {
1711         CmiLock(tx_cq_lock);
1712         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1713         CmiUnlock(tx_cq_lock);
1714         if(status != GNI_RC_SUCCESS) break;
1715 #else
1716     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1717     {
1718 #endif
1719         type = GNI_CQ_GET_TYPE(ev);
1720         if (type == GNI_CQ_EVENT_TYPE_POST)
1721         {
1722             inst_id     = GNI_CQ_GET_INST_ID(ev);
1723 #if PRINT_SYH
1724             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1725 #endif
1726 #if CMK_SMP && !COMM_THREAD_SEND
1727             CmiLock(tx_cq_lock);
1728 #endif
1729             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1730 #if CMK_SMP && !COMM_THREAD_SEND
1731             CmiUnlock(tx_cq_lock);
1732 #endif
1733 #ifdef CMK_DIRECT
1734             if(tmp_pd->amo_cmd == 1)
1735             {
1736                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1737                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1738                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1739                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1740             }
1741             else{
1742                 MallocControlMsg(ack_msg_tmp);
1743                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1744                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1745             }
1746 #else
1747             MallocControlMsg(ack_msg_tmp);
1748             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1749             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1750 #endif
1751             ////Message is sent, free message , put is not used now
1752             switch (tmp_pd->type) {
1753 #if CMK_PERSISTENT_COMM
1754             case GNI_POST_RDMA_PUT:
1755 #if ! USE_LRTS_MEMPOOL
1756                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1757 #endif
1758             case GNI_POST_FMA_PUT:
1759                 CmiFree((void *)tmp_pd->local_addr);
1760                 msg_tag = PUT_DONE_TAG;
1761                 break;
1762 #endif
1763 #ifdef CMK_DIRECT
1764             case GNI_POST_RDMA_PUT:
1765             case GNI_POST_FMA_PUT:
1766                 //sender ACK to receiver to trigger it is done
1767                 msg_tag = DIRECT_PUT_DONE_TAG;
1768                 break;
1769 #endif
1770             case GNI_POST_RDMA_GET:
1771             case GNI_POST_FMA_GET:
1772 #if  ! USE_LRTS_MEMPOOL
1773                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1774                 msg_tag = ACK_TAG;  
1775 #else
1776                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1777                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1778                 {
1779                     msg_tag = BIG_MSG_TAG; 
1780                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1781                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1782                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1783                 } 
1784                 else
1785                 {
1786                     msg_tag = ACK_TAG;  
1787                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1788                 }
1789                 ack_msg_tmp->length = tmp_pd->length;
1790                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1791 #endif
1792                 break;
1793             default:
1794                 CmiPrintf("type=%d\n", tmp_pd->type);
1795                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1796             }
1797 #if CMK_DIRECT
1798             if(tmp_pd->amo_cmd == 1)
1799                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1800             else
1801 #endif
1802                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1803             if(status == GNI_RC_SUCCESS)
1804             {
1805 #if CMK_DIRECT
1806                 if(tmp_pd->amo_cmd == 1)
1807                     free(cmk_direct_done_msg); 
1808                 else
1809 #endif
1810                     FreeControlMsg(ack_msg_tmp);
1811
1812             }
1813 #if CMK_PERSISTENT_COMM
1814             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1815 #endif
1816             {
1817                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1818 #if PRINT_SYH
1819                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1820 #endif
1821 #if CMK_SMP_TRACE_COMMTHREAD
1822                     START_EVENT();
1823 #endif
1824                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1825                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1826 #if MACHINE_DEBUG_LOG
1827                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1828                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1829                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1830 #endif
1831 #if CMK_SMP_TRACE_COMMTHREAD
1832                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1833 #endif
1834                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1835                 }else if(msg_tag == BIG_MSG_TAG){
1836                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1837                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1838                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1839 #if CMK_SMP_TRACE_COMMTHREAD
1840                         START_EVENT();
1841 #endif
1842 #if PRINT_SYH
1843                         printf("Pipeline msg done [%d]\n", myrank);
1844 #endif
1845 #if CMK_SMP_TRACE_COMMTHREAD
1846                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1847 #endif
1848                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1849                     }
1850                 }
1851             }
1852             FreePostDesc(tmp_pd);
1853         }
1854     } //end while
1855     if(status == GNI_RC_ERROR_RESOURCE)
1856     {
1857         GNI_RC_CHECK("Smsg_tx_cq full", status);
1858     }
1859 }
1860
1861 static void  SendRdmaMsg()
1862 {
1863     gni_return_t            status = GNI_RC_SUCCESS;
1864     gni_mem_handle_t        msg_mem_hndl;
1865     RDMA_REQUEST *ptr = 0, *tmp_ptr;
1866     RDMA_REQUEST *pre = 0;
1867     int i, register_size = 0;
1868     void                    *msg;
1869 #if CMK_SMP
1870     int len = PCQueueLength(sendRdmaBuf);
1871     for (i=0; i<len; i++)
1872     {
1873         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1874         if (ptr == NULL) break;
1875 #else
1876     ptr = sendRdmaBuf;
1877     while (ptr!=0)
1878     {
1879 #endif 
1880         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1881         gni_post_descriptor_t *pd = ptr->pd;
1882         status = GNI_RC_SUCCESS;
1883         
1884         if(pd->cqwrite_value == 0)
1885         {
1886             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1887             {
1888                 msg = (void*)(pd->local_addr);
1889                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1890                 if(status == GNI_RC_SUCCESS)
1891                 {
1892                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1893                 }
1894             }else
1895             {
1896                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1897             }
1898             if(NoMsgInRecv( (void*)(pd->local_addr)))
1899                 register_size = GetMempoolsize((void*)(pd->local_addr));
1900             else
1901                 register_size = 0;
1902         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1903         {
1904             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1905             //MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh, status)
1906         }
1907         if(status == GNI_RC_SUCCESS)        //mem register good
1908         {
1909 #if CMK_SMP && !COMM_THREAD_SEND
1910             CmiLock(tx_cq_lock);
1911 #endif
1912             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1913                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1914             else
1915                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1916 #if CMK_SMP && !COMM_THREAD_SEND
1917             CmiUnlock(tx_cq_lock);
1918 #endif
1919
1920             if(status == GNI_RC_SUCCESS)    //post good
1921             {
1922 #if !CMK_SMP
1923                 tmp_ptr = ptr;
1924                 if(pre != 0) {
1925                     pre->next = ptr->next;
1926                 }
1927                 else {
1928                     sendRdmaBuf = ptr->next;
1929                 }
1930                 ptr = ptr->next;
1931                 FreeRdmaRequest(tmp_ptr);
1932 #endif
1933                 if(pd->cqwrite_value == 0)
1934                 {
1935                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1936                 }
1937 #if MACHINE_DEBUG_LOG
1938                 buffered_recv_msg += register_size;
1939                 MACHSTATE(8, "GO request from buffered\n"); 
1940 #endif
1941             }else           // cannot post
1942             {
1943 #if CMK_SMP
1944                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1945 #else
1946                 pre = ptr;
1947                 ptr = ptr->next;
1948 #endif
1949                 break;
1950             }
1951         } else          //memory registration fails
1952         {
1953 #if CMK_SMP
1954             PCQueuePush(sendRdmaBuf, (char*)ptr);
1955 #else
1956             pre = ptr;
1957             ptr = ptr->next;
1958 #endif
1959         }
1960     } //end while
1961 #if ! CMK_SMP
1962     if(ptr == 0)
1963         sendRdmaTail = pre;
1964 #endif
1965 }
1966
1967 // return 1 if all messages are sent
1968 static int SendBufferMsg(SMSG_QUEUE *queue)
1969 {
1970     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1971     CONTROL_MSG         *control_msg_tmp;
1972     gni_return_t        status;
1973     int                 done = 1;
1974     int                 register_size;
1975     void                *register_addr;
1976     int                 index_previous = -1;
1977     int                 index = queue->smsg_head_index;
1978 #if CMI_EXERT_SEND_CAP
1979     int                 sent_cnt = 0;
1980 #endif
1981
1982 #if ! CMK_SMP
1983     index = queue->smsg_head_index;
1984 #else
1985     index = 0;
1986 #endif
1987 #if CMK_SMP
1988     while(index <mysize)
1989     {
1990         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1991         for (i=0; i<len; i++) 
1992         {
1993             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
1994             if (ptr == 0) break;
1995 #else
1996     while(index != -1)
1997     {
1998         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
1999         pre = 0;
2000         while(ptr != 0)
2001         {
2002 #endif
2003             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
2004             status = GNI_RC_ERROR_RESOURCE;
2005             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2006                 /* connection not exists yet */
2007             }
2008             else
2009             switch(ptr->tag)
2010             {
2011             case SMALL_DATA_TAG:
2012                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2013                 if(status == GNI_RC_SUCCESS)
2014                 {
2015                     CmiFree(ptr->msg);
2016                 }
2017                 break;
2018             case LMSG_INIT_TAG:
2019                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2020                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2021                 break;
2022             case   ACK_TAG:
2023             case   BIG_MSG_TAG:
2024                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2025                 if(status == GNI_RC_SUCCESS)
2026                 {
2027                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2028                 }
2029                 break;
2030 #ifdef CMK_DIRECT
2031             case DIRECT_PUT_DONE_TAG:
2032                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2033                 if(status == GNI_RC_SUCCESS)
2034                 {
2035                     free((CMK_DIRECT_HEADER*)ptr->msg);
2036                 }
2037                 break;
2038
2039 #endif
2040             default:
2041                 printf("Weird tag\n");
2042                 CmiAbort("should not happen\n");
2043             }       // end switch
2044             if(status == GNI_RC_SUCCESS)
2045             {
2046 #if !CMK_SMP
2047                 tmp_ptr = ptr;
2048                 if(pre)
2049                 {
2050                     ptr = pre ->next = ptr->next;
2051                 }else
2052                 {
2053                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2054                 }
2055                 FreeMsgList(tmp_ptr);
2056 #else
2057                 FreeMsgList(ptr);
2058 #endif
2059 #if PRINT_SYH
2060                 buffered_smsg_counter--;
2061                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2062 #endif
2063 #if CMI_EXERT_SEND_CAP
2064                 sent_cnt++;
2065                 if(sent_cnt == SEND_CAP)
2066                     break;
2067 #endif
2068             }else {
2069 #if CMK_SMP
2070                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2071 #else
2072                 pre = ptr;
2073                 ptr=ptr->next;
2074 #endif
2075                 done = 0;
2076                 if(status == GNI_RC_ERROR_RESOURCE)
2077                     break;
2078             } 
2079         } //end while
2080 #if !CMK_SMP
2081         if(ptr == 0)
2082             queue->smsg_msglist_index[index].tail = pre;
2083         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2084         {
2085             if(index_previous != -1)
2086                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2087             else
2088                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2089         }else
2090         {
2091             index_previous = index;
2092         }
2093         index = queue->smsg_msglist_index[index].next;
2094 #else
2095         index++;
2096 #endif
2097
2098 #if CMI_EXERT_SEND_CAP
2099         if(sent_cnt == SEND_CAP)
2100                 break;
2101 #endif
2102     }   // end pooling for all cores
2103     return done;
2104 }
2105
2106 static void ProcessDeadlock();
2107
2108 void LrtsAdvanceCommunication(int whileidle)
2109 {
2110     /*  Receive Msg first */
2111     //MACHSTATE(8, "calling advance comm \n") ; 
2112 #if CMK_SMP_TRACE_COMMTHREAD
2113     double startT, endT;
2114 #endif
2115     if (useDynamicSMSG && whileidle)
2116     {
2117 #if CMK_SMP_TRACE_COMMTHREAD
2118         startT = CmiWallTimer();
2119 #endif
2120         PumpDatagramConnection();
2121 #if CMK_SMP_TRACE_COMMTHREAD
2122         endT = CmiWallTimer();
2123         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2124 #endif
2125     }
2126
2127 #if CMK_SMP_TRACE_COMMTHREAD
2128     startT = CmiWallTimer();
2129 #endif
2130     PumpNetworkSmsg();
2131     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2132 #if CMK_SMP_TRACE_COMMTHREAD
2133     endT = CmiWallTimer();
2134     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2135 #endif
2136
2137 #if CMK_SMP_TRACE_COMMTHREAD
2138     startT = CmiWallTimer();
2139 #endif
2140     PumpLocalRdmaTransactions();
2141     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2142 #if CMK_SMP_TRACE_COMMTHREAD
2143     endT = CmiWallTimer();
2144     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2145 #endif
2146     /* Send buffered Message */
2147 #if CMK_SMP_TRACE_COMMTHREAD
2148     startT = CmiWallTimer();
2149 #endif
2150 #if CMK_USE_OOB
2151     if (SendBufferMsg(&smsg_oob_queue) == 1)
2152 #endif
2153     SendBufferMsg(&smsg_queue);
2154     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2155 #if CMK_SMP_TRACE_COMMTHREAD
2156     endT = CmiWallTimer();
2157     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2158 #endif
2159
2160 #if CMK_SMP_TRACE_COMMTHREAD
2161     startT = CmiWallTimer();
2162 #endif
2163     SendRdmaMsg();
2164     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2165 #if CMK_SMP_TRACE_COMMTHREAD
2166     endT = CmiWallTimer();
2167     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2168 #endif
2169
2170 #if CMK_SMP
2171     if (_detected_hang)  ProcessDeadlock();
2172 #endif
2173 }
2174
2175 /* useDynamicSMSG */
2176 static void _init_dynamic_smsg()
2177 {
2178     gni_return_t status;
2179     uint32_t     vmdh_index = -1;
2180     int i;
2181
2182     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2183     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2184     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2185     for(i=0; i<mysize; i++) {
2186         smsg_connected_flag[i] = 0;
2187         smsg_attr_vector_local[i] = NULL;
2188         smsg_attr_vector_remote[i] = NULL;
2189     }
2190     if(mysize <=512)
2191     {
2192         SMSG_MAX_MSG = 4096;
2193     }else if (mysize <= 4096)
2194     {
2195         SMSG_MAX_MSG = 4096/mysize * 1024;
2196     }else if (mysize <= 16384)
2197     {
2198         SMSG_MAX_MSG = 512;
2199     }else {
2200         SMSG_MAX_MSG = 256;
2201     }
2202
2203     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2204     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2205     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2206     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2207     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2208
2209     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2210     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2211     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2212     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2213     mailbox_list->offset = 0;
2214     mailbox_list->next = 0;
2215     
2216     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2217         mailbox_list->size, smsg_rx_cqh,
2218         GNI_MEM_READWRITE,   
2219         vmdh_index,
2220         &(mailbox_list->mem_hndl));
2221     GNI_RC_CHECK("MEMORY registration for smsg", status);
2222
2223     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2224     GNI_RC_CHECK("Unbound EP", status);
2225     
2226     alloc_smsg_attr(&send_smsg_attr);
2227
2228     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2229     GNI_RC_CHECK("post unbound datagram", status);
2230
2231       /* always pre-connect to proc 0 */
2232     //if (myrank != 0) connect_to(0);
2233 }
2234
2235 static void _init_static_smsg()
2236 {
2237     gni_smsg_attr_t      *smsg_attr;
2238     gni_smsg_attr_t      remote_smsg_attr;
2239     gni_smsg_attr_t      *smsg_attr_vec;
2240     gni_mem_handle_t     my_smsg_mdh_mailbox;
2241     int      ret, i;
2242     gni_return_t status;
2243     uint32_t              vmdh_index = -1;
2244     mdh_addr_t            base_infor;
2245     mdh_addr_t            *base_addr_vec;
2246     char *env;
2247
2248     if(mysize <=512)
2249     {
2250         SMSG_MAX_MSG = 1024;
2251     }else if (mysize <= 4096)
2252     {
2253         SMSG_MAX_MSG = 1024;
2254     }else if (mysize <= 16384)
2255     {
2256         SMSG_MAX_MSG = 512;
2257     }else {
2258         SMSG_MAX_MSG = 256;
2259     }
2260     
2261     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2262     if (env) SMSG_MAX_MSG = atoi(env);
2263     CmiAssert(SMSG_MAX_MSG > 0);
2264
2265     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2266     
2267     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2268     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2269     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2270     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2271     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2272     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2273     CmiAssert(ret == 0);
2274     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2275     
2276     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2277             smsg_memlen*(mysize), smsg_rx_cqh,
2278             GNI_MEM_READWRITE,   
2279             vmdh_index,
2280             &my_smsg_mdh_mailbox);
2281     register_memory_size += smsg_memlen*(mysize);
2282     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2283
2284     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2285
2286     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2287     base_infor.mdh =  my_smsg_mdh_mailbox;
2288     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2289
2290     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2291  
2292     for(i=0; i<mysize; i++)
2293     {
2294         if(i==myrank)
2295             continue;
2296         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2297         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2298         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2299         smsg_attr[i].mbox_offset = i*smsg_memlen;
2300         smsg_attr[i].buff_size = smsg_memlen;
2301         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2302         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2303     }
2304
2305     for(i=0; i<mysize; i++)
2306     {
2307         if (myrank == i) continue;
2308
2309         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2310         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2311         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2312         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2313         remote_smsg_attr.buff_size = smsg_memlen;
2314         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2315         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2316
2317         /* initialize the smsg channel */
2318         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2319         GNI_RC_CHECK("SMSG Init", status);
2320     } //end initialization
2321
2322     free(base_addr_vec);
2323     free(smsg_attr);
2324
2325     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2326     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2327
2328
2329 inline
2330 static void _init_send_queue(SMSG_QUEUE *queue)
2331 {
2332      int i;
2333      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2334      for(i =0; i<mysize; i++)
2335      {
2336         queue->smsg_msglist_index[i].next = -1;
2337 #if CMK_SMP
2338         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2339 #else
2340         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2341 #endif
2342         
2343      }
2344      queue->smsg_head_index = -1;
2345 }
2346
2347 inline
2348 static void _init_smsg()
2349 {
2350     if(mysize > 1) {
2351         if (useDynamicSMSG)
2352             _init_dynamic_smsg();
2353         else
2354             _init_static_smsg();
2355     }
2356
2357     _init_send_queue(&smsg_queue);
2358 #if CMK_USE_OOB
2359     _init_send_queue(&smsg_oob_queue);
2360 #endif
2361 }
2362
2363 static void _init_static_msgq()
2364 {
2365     gni_return_t status;
2366     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2367     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2368     msgq_attrs.smsg_q_sz = 1;
2369     msgq_attrs.rcv_pool_sz = 1;
2370     msgq_attrs.num_msgq_eps = 2;
2371     msgq_attrs.nloc_insts = 8;
2372     msgq_attrs.modes = 0;
2373     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2374
2375     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2376     GNI_RC_CHECK("MSGQ Init", status);
2377
2378
2379 }
2380
2381 #if CMK_SMP && STEAL_MEMPOOL
2382 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2383 {
2384     void *pool = NULL;
2385     int i, k;
2386     // check other ranks
2387     for (k=0; k<CmiMyNodeSize()+1; k++) {
2388         i = (CmiMyRank()+k)%CmiMyNodeSize();
2389         if (i==CmiMyRank()) continue;
2390         mempool_type *mptr = CpvAccessOther(mempool, i);
2391         CmiLock(mptr->mempoolLock);
2392         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2393         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2394             CmiUnlock(mptr->mempoolLock);
2395             continue;
2396         }
2397         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2398         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2399             /* search in the free list */
2400           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2401           mempool_header *current = free_header;
2402           while (current) {
2403             if (current->next_free == (char*)header-(char*)mptr) break;
2404             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2405           }
2406           if (current == NULL) {         /* not found in free list */
2407             CmiUnlock(mptr->mempoolLock);
2408             continue;
2409           }
2410 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2411             /* search the previous memblock, and remove the tail */
2412           mempool_block *ptr = (mempool_block *)mptr;
2413           while (ptr) {
2414             if (ptr->memblock_next == mptr->memblock_tail) break;
2415             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2416           }
2417           CmiAssert(ptr!=NULL);
2418           ptr->memblock_next = 0;
2419           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2420
2421             /* remove memblock from the free list */
2422           current->next_free = header->next_free;
2423           if (header == free_header) mptr->freelist_head = header->next_free;
2424
2425           CmiUnlock(mptr->mempoolLock);
2426
2427           pool = (void*)tail;
2428           *mem_hndl = tail->mem_hndl;
2429           *size = tail->size;
2430           return pool;
2431         }
2432         CmiUnlock(mptr->mempoolLock);
2433     }
2434
2435       /* steal failed, deregister and free memblock now */
2436     int freed = 0;
2437     for (k=0; k<CmiMyNodeSize()+1; k++) {
2438         i = (CmiMyRank()+k)%CmiMyNodeSize();
2439         mempool_type *mptr = CpvAccessOther(mempool, i);
2440         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2441
2442         mempool_block *mempools_head = &(mptr->mempools_head);
2443         mempool_block *current = mempools_head;
2444         mempool_block *prev = NULL;
2445
2446         while (current) {
2447           int isfree = 0;
2448           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2449 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2450           mempool_header *cur = free_header;
2451           mempool_header *header;
2452           if (current != mempools_head) {
2453             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2454              /* search in free list */
2455             if (header->size == current->size - sizeof(mempool_block)) {
2456               cur = free_header;
2457               while (cur) {
2458                 if (cur->next_free == (char*)header-(char*)mptr) break;
2459                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2460               }
2461               if (cur != NULL) isfree = 1;
2462             }
2463           }
2464           if (isfree) {
2465               /* remove from free list */
2466             cur->next_free = header->next_free;
2467             if (header == free_header) mptr->freelist_head = header->next_free;
2468              // deregister
2469             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2470             GNI_RC_CHECK("steal Mempool de-register", status);
2471             mempool_block *ptr = current;
2472             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2473             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2474 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2475             freed += ptr->size;
2476             free(ptr);
2477              // try now
2478             if (freed > *size) {
2479               if (pool == NULL) {
2480                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2481                 CmiAssert(ret == 0);
2482               }
2483               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2484               if (status == GNI_RC_SUCCESS) {
2485                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2486 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2487                 return pool;
2488               }
2489 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2490             }
2491           }
2492           else {
2493              prev = current;
2494              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2495           }
2496         }
2497
2498         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2499     }
2500       /* still no luck registering pool */
2501     if (pool) free(pool);
2502     return NULL;
2503 }
2504 #endif
2505
2506 static long long int total_mempool_size = 0;
2507 static long long int total_mempool_calls = 0;
2508
2509 #if USE_LRTS_MEMPOOL
2510 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2511 {
2512     void *pool;
2513     int ret;
2514
2515     int default_size =  expand_flag? _expand_mem : _mempool_size;
2516     if (*size < default_size) *size = default_size;
2517     total_mempool_size += *size;
2518     total_mempool_calls += 1;
2519     ret = posix_memalign(&pool, ALIGNBUF, *size);
2520     if (ret != 0) {
2521 #if CMK_SMP && STEAL_MEMPOOL
2522       pool = steal_mempool_block(size, mem_hndl);
2523       if (pool != NULL) return pool;
2524 #endif
2525       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2526       if (ret == ENOMEM)
2527         CmiAbort("alloc_mempool_block: out of memory.");
2528       else
2529         CmiAbort("alloc_mempool_block: posix_memalign failed");
2530     }
2531         SetMemHndlZero((*mem_hndl));
2532     
2533     return pool;
2534 }
2535
2536 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2537 {
2538     if(!(IsMemHndlZero(mem_hndl)))
2539     {
2540         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2541         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2542     }
2543     free(ptr);
2544 }
2545 #endif
2546
2547 void LrtsPreCommonInit(int everReturn){
2548 #if USE_LRTS_MEMPOOL
2549     CpvInitialize(mempool_type*, mempool);
2550     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2551     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2552 #endif
2553 }
2554
2555
2556 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2557 {
2558     register int            i;
2559     int                     rc;
2560     int                     device_id = 0;
2561     unsigned int            remote_addr;
2562     gni_cdm_handle_t        cdm_hndl;
2563     gni_return_t            status = GNI_RC_SUCCESS;
2564     uint32_t                vmdh_index = -1;
2565     uint8_t                 ptag;
2566     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2567     int                     first_spawned;
2568     int                     physicalID;
2569     char                   *env;
2570
2571     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2572     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2573     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2574    
2575     status = PMI_Init(&first_spawned);
2576     GNI_RC_CHECK("PMI_Init", status);
2577
2578     status = PMI_Get_size(&mysize);
2579     GNI_RC_CHECK("PMI_Getsize", status);
2580
2581     status = PMI_Get_rank(&myrank);
2582     GNI_RC_CHECK("PMI_getrank", status);
2583
2584     //physicalID = CmiPhysicalNodeID(myrank);
2585     
2586     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2587
2588     *myNodeID = myrank;
2589     *numNodes = mysize;
2590   
2591 #if !COMM_THREAD_SEND
2592     /* Currently, we only consider the case that comm. thread will only recv msgs */
2593     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2594 #endif
2595
2596     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2597     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2598     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2599
2600     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2601     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2602     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2603
2604     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2605     if (env) useDynamicSMSG = 1;
2606     if (!useDynamicSMSG)
2607       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2608     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2609     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2610     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2611     
2612     if(myrank == 0)
2613     {
2614         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2615         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2616     }
2617 #ifdef USE_ONESIDED
2618     onesided_init(NULL, &onesided_hnd);
2619
2620     // this is a GNI test, so use the libonesided bypass functionality
2621     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2622     local_addr = gniGetNicAddress();
2623 #else
2624     ptag = get_ptag();
2625     cookie = get_cookie();
2626 #if 0
2627     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2628 #endif
2629     //Create and attach to the communication  domain */
2630     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2631     GNI_RC_CHECK("GNI_CdmCreate", status);
2632     //* device id The device id is the minor number for the device
2633     //that is assigned to the device by the system when the device is created.
2634     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2635     //where X is the device number 0 default 
2636     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2637     GNI_RC_CHECK("GNI_CdmAttach", status);
2638     local_addr = get_gni_nic_address(0);
2639 #endif
2640     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2641     _MEMCHECK(MPID_UGNI_AllAddr);
2642     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2643     /* create the local completion queue */
2644     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2645     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2646     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2647     
2648     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2649     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2650     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2651
2652     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2653     GNI_RC_CHECK("Create CQ (rx)", status);
2654     
2655     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2656     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2657     
2658     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2659     //GNI_RC_CHECK("Create BTE CQ", status);
2660
2661     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2662     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2663     _MEMCHECK(ep_hndl_array);
2664 #if CMK_SMP && !COMM_THREAD_SEND
2665     tx_cq_lock  = CmiCreateLock();
2666     rx_cq_lock  = CmiCreateLock();
2667 #endif
2668     for (i=0; i<mysize; i++) {
2669         if(i == myrank) continue;
2670         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2671         GNI_RC_CHECK("GNI_EpCreate ", status);   
2672         remote_addr = MPID_UGNI_AllAddr[i];
2673         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2674         GNI_RC_CHECK("GNI_EpBind ", status);   
2675     }
2676
2677     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2678     _init_smsg();
2679     PMI_Barrier();
2680
2681 #if     USE_LRTS_MEMPOOL
2682     env = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2683     if (env) _mempool_size = CmiReadSize(env);
2684     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&env,"Set the memory pool size")) 
2685         _mempool_size = CmiReadSize(env);
2686
2687     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2688
2689     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2690     if (env) {
2691         MAX_REG_MEM = CmiReadSize(env);
2692         if (myrank==0) printf("Charm++> memory pool maximum size: %1.fMB\n", MAX_REG_MEM/1024.0/1024);
2693     }
2694
2695     env = getenv("CHARM_UGNI_SEND_MAX");
2696     if (env) {
2697         MAX_BUFF_SEND = CmiReadSize(env);
2698         if (myrank==0) printf("Charm++> maximum pending memory pool for sending: %1.fMB\n", MAX_BUFF_SEND/1024.0/1024);
2699     }
2700
2701     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2702     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2703 #endif
2704
2705     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2706     if (env) _checkProgress = 0;
2707     if (mysize == 1) _checkProgress = 0;
2708
2709     /* init DMA buffer for medium message */
2710
2711     //_init_DMA_buffer();
2712     
2713     free(MPID_UGNI_AllAddr);
2714 #if CMK_SMP
2715     sendRdmaBuf = PCQueueCreate();
2716 #else
2717     sendRdmaBuf = 0;
2718 #endif
2719 #if MACHINE_DEBUG_LOG
2720     char ln[200];
2721     sprintf(ln,"debugLog.%d",myrank);
2722     debugLog=fopen(ln,"w");
2723 #endif
2724
2725 }
2726
2727 void* LrtsAlloc(int n_bytes, int header)
2728 {
2729     void *ptr;
2730 #if 0
2731     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2732 #endif
2733     if(n_bytes <= SMSG_MAX_MSG)
2734     {
2735         int totalsize = n_bytes+header;
2736         ptr = malloc(totalsize);
2737     }
2738     else {
2739         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2740 #if     USE_LRTS_MEMPOOL
2741         n_bytes = ALIGN64(n_bytes);
2742         if(n_bytes <= BIG_MSG)
2743         {
2744             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2745             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2746         }else 
2747         {
2748             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2749             ptr = res + ALIGNBUF - header;
2750
2751         }
2752 #else
2753         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2754         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2755         ptr = res + ALIGNBUF - header;
2756 #endif
2757     }
2758     return ptr;
2759 }
2760
2761 void  LrtsFree(void *msg)
2762 {
2763     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2764     if (size <= SMSG_MAX_MSG)
2765       free(msg);
2766     else if(size>BIG_MSG)
2767     {
2768         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2769
2770     }else
2771     {
2772 #if     USE_LRTS_MEMPOOL
2773 #if CMK_SMP
2774         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2775 #else
2776         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2777 #endif
2778 #else
2779         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2780 #endif
2781     }
2782 }
2783
2784 void LrtsExit()
2785 {
2786     /* free memory ? */
2787 #if USE_LRTS_MEMPOOL
2788     mempool_destroy(CpvAccess(mempool));
2789 #endif
2790     PMI_Finalize();
2791     exit(0);
2792 }
2793
2794 void LrtsDrainResources()
2795 {
2796     if(mysize == 1) return;
2797     while (
2798 #if CMK_USE_OOB
2799            !SendBufferMsg(&smsg_oob_queue) ||
2800 #endif
2801            !SendBufferMsg(&smsg_queue))
2802     {
2803         if (useDynamicSMSG)
2804             PumpDatagramConnection();
2805         PumpNetworkSmsg();
2806         PumpLocalRdmaTransactions();
2807         SendRdmaMsg();
2808     }
2809     PMI_Barrier();
2810 }
2811
2812 void LrtsAbort(const char *message) {
2813     printf("CmiAbort is calling on PE:%d\n", myrank);
2814     CmiPrintStackTrace(0);
2815     PMI_Abort(-1, message);
2816 }
2817
2818 /**************************  TIMER FUNCTIONS **************************/
2819 #if CMK_TIMER_USE_SPECIAL
2820 /* MPI calls are not threadsafe, even the timer on some machines */
2821 static CmiNodeLock  timerLock = 0;
2822 static int _absoluteTime = 0;
2823 static int _is_global = 0;
2824 static struct timespec start_ts;
2825
2826 inline int CmiTimerIsSynchronized() {
2827     return 0;
2828 }
2829
2830 inline int CmiTimerAbsolute() {
2831     return _absoluteTime;
2832 }
2833
2834 double CmiStartTimer() {
2835     return 0.0;
2836 }
2837
2838 double CmiInitTime() {
2839     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2840 }
2841
2842 void CmiTimerInit(char **argv) {
2843     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2844     if (_absoluteTime && CmiMyPe() == 0)
2845         printf("Charm++> absolute  timer is used\n");
2846     
2847     _is_global = CmiTimerIsSynchronized();
2848
2849
2850     if (_is_global) {
2851         if (CmiMyRank() == 0) {
2852             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2853         }
2854     } else { /* we don't have a synchronous timer, set our own start time */
2855         CmiBarrier();
2856         CmiBarrier();
2857         CmiBarrier();
2858         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2859     }
2860     CmiNodeAllBarrier();          /* for smp */
2861 }
2862
2863 /**
2864  * Since the timerLock is never created, and is
2865  * always NULL, then all the if-condition inside
2866  * the timer functions could be disabled right
2867  * now in the case of SMP.
2868  */
2869 double CmiTimer(void) {
2870     struct timespec now_ts;
2871     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2872     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2873         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2874 }
2875
2876 double CmiWallTimer(void) {
2877     struct timespec now_ts;
2878     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2879     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2880         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2881 }
2882
2883 double CmiCpuTimer(void) {
2884     struct timespec now_ts;
2885     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2886     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2887         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2888 }
2889
2890 #endif
2891 /************Barrier Related Functions****************/
2892
2893 int CmiBarrier()
2894 {
2895     gni_return_t status;
2896
2897 #if CMK_SMP
2898     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2899     CmiNodeAllBarrier();
2900     if (CmiMyRank() == CmiMyNodeSize())
2901 #else
2902     if (CmiMyRank() == 0)
2903 #endif
2904     {
2905         /**
2906          *  The call of CmiBarrier is usually before the initialization
2907          *  of trace module of Charm++, therefore, the START_EVENT
2908          *  and END_EVENT are disabled here. -Chao Mei
2909          */
2910         /*START_EVENT();*/
2911         status = PMI_Barrier();
2912         GNI_RC_CHECK("PMI_Barrier", status);
2913         /*END_EVENT(10);*/
2914     }
2915     CmiNodeAllBarrier();
2916     return status;
2917
2918 }
2919 #if CMK_DIRECT
2920 #include "machine-cmidirect.c"
2921 #endif
2922 #if CMK_PERSISTENT_COMM
2923 #include "machine-persistent.c"
2924 #endif
2925
2926