fixed potential bugs with mem register
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes      # disable checking deadlock
21  */
22 /*@{*/
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <errno.h>
28 #include <malloc.h>
29 #include <unistd.h>
30
31 #include <gni_pub.h>
32 #include <pmi.h>
33 #include <time.h>
34
35 #include "converse.h"
36
37 #define PRINT_SYH  0
38
39 // Trace communication thread
40 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
41 #define TRACE_THRESHOLD     0.00005
42 #define CMI_MPI_TRACE_MOREDETAILED 0
43 #undef CMI_MPI_TRACE_USEREVENTS
44 #define CMI_MPI_TRACE_USEREVENTS 1
45 #else
46 #undef CMK_SMP_TRACE_COMMTHREAD
47 #define CMK_SMP_TRACE_COMMTHREAD 0
48 #endif
49
50 #define CMK_TRACE_COMMOVERHEAD 0
51 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
52 #undef CMI_MPI_TRACE_USEREVENTS
53 #define CMI_MPI_TRACE_USEREVENTS 1
54 #else
55 #undef CMK_TRACE_COMMOVERHEAD
56 #define CMK_TRACE_COMMOVERHEAD 0
57 #endif
58
59 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
60 CpvStaticDeclare(double, projTraceStart);
61 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
62 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
63 #else
64 #define  START_EVENT()
65 #define  END_EVENT(x)
66 #endif
67
68 #define CMI_EXERT_SEND_CAP      0
69 #define CMI_EXERT_RECV_CAP      0
70
71 #if CMI_EXERT_SEND_CAP
72 #define SEND_CAP 16
73 #endif
74
75 #if CMI_EXERT_RECV_CAP
76 #define RECV_CAP 2
77 #endif
78
79 static int _checkProgress = 1;             /* check deadlock */
80 static int _detected_hang = 0;
81
82 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
83
84 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
85
86 static int avg_smsg_connection = 32;
87 static int                 *smsg_connected_flag= 0;
88 static gni_smsg_attr_t     **smsg_attr_vector_local;
89 static gni_smsg_attr_t     **smsg_attr_vector_remote;
90 static gni_ep_handle_t     ep_hndl_unbound;
91 static gni_smsg_attr_t     send_smsg_attr;
92 static gni_smsg_attr_t     recv_smsg_attr;
93
94 typedef struct _dynamic_smsg_mailbox{
95    void     *mailbox_base;
96    int      size;
97    int      offset;
98    gni_mem_handle_t  mem_hndl;
99    struct      _dynamic_smsg_mailbox  *next;
100 }dynamic_smsg_mailbox_t;
101
102 static dynamic_smsg_mailbox_t  *mailbox_list;
103
104 #define REMOTE_EVENT                      0
105 #define USE_LRTS_MEMPOOL                  1
106
107 #if USE_LRTS_MEMPOOL
108
109 #define oneMB (1024ll*1024)
110 static CmiInt8 _mempool_size = 8*oneMB;
111 static CmiInt8 _expand_mem =  4*oneMB;
112
113 #endif
114
115 #if CMK_SMP
116 //Dynamic flow control about memory registration
117 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
118 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
119 #else
120 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
121 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
122 #endif
123
124 static CmiInt8 buffered_send_msg = 0;
125
126 #define BIG_MSG                  16*oneMB
127 #define ONE_SEG                  4*oneMB
128 #define BIG_MSG_PIPELINE         4
129
130 #if CMK_SMP
131 #define COMM_THREAD_SEND 1
132 #endif
133
134 int         rdma_id = 0;
135
136 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
137 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
138
139 #if PRINT_SYH
140 int         lrts_send_msg_id = 0;
141 int         lrts_local_done_msg = 0;
142 int         lrts_smsg_success = 0;
143 int         lrts_send_rdma_success = 0;
144 #endif
145
146 #include "machine.h"
147
148 #include "pcqueue.h"
149
150 #include "mempool.h"
151
152 #if CMK_PERSISTENT_COMM
153 #include "machine-persistent.h"
154 #endif
155
156 //#define  USE_ONESIDED 1
157 #ifdef USE_ONESIDED
158 //onesided implementation is wrong, since no place to restore omdh
159 #include "onesided.h"
160 onesided_hnd_t   onesided_hnd;
161 onesided_md_t    omdh;
162 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
163
164 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
165
166 #else
167 uint8_t   onesided_hnd, omdh;
168 #if REMOTE_EVENT
169 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size>= MAX_REG_MEM) { \
170                                                                                          status = GNI_RC_ERROR_NOMEM;} \
171         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
172                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
173 #else
174 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size>= MAX_REG_MEM) { \
175                                                                                          status = GNI_RC_ERROR_NOMEM;} \
176         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
177                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
178 #endif
179 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  if( GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) register_memory_size -= size; \
180                                                                                                                            else CmiAbort("MEM_DEregister");;
181 #endif
182
183 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
184 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
185 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
186                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
187 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
188                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
189 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
190 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
191 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
192 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
193 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
194 #define   GetSizeFromHeader(x) ((block_header*)x)->size
195 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
196 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
197 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
198 #define   IsMemHndlZero(x)  ((x).qword1 == 0 && (x).qword2 == 0)
199 #define   SetMemHndlZero(x)  (x).qword1 = 0; (x).qword2 = 0 ;
200 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
201
202 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
203 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
204 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
205 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
206
207 #define ALIGNBUF                64
208
209 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
210 /* If SMSG is not used */
211
212 #define FMA_PER_CORE  1024
213 #define FMA_BUFFER_SIZE 1024
214
215 /* If SMSG is used */
216 static int  SMSG_MAX_MSG = 1024;
217 #define SMSG_MAX_CREDIT 72 
218
219 #define MSGQ_MAXSIZE       2048
220 /* large message transfer with FMA or BTE */
221 #define LRTS_GNI_RDMA_THRESHOLD  1024 
222
223 #if CMK_SMP
224 static int  REMOTE_QUEUE_ENTRIES=163840; 
225 static int LOCAL_QUEUE_ENTRIES=163840; 
226 #else
227 static int  REMOTE_QUEUE_ENTRIES=20480;
228 static int LOCAL_QUEUE_ENTRIES=20480; 
229 #endif
230
231 #define BIG_MSG_TAG  0x26
232 #define PUT_DONE_TAG      0x28
233 #define DIRECT_PUT_DONE_TAG      0x29
234 #define ACK_TAG           0x30
235 /* SMSG is data message */
236 #define SMALL_DATA_TAG          0x31
237 /* SMSG is a control message to initialize a BTE */
238 #define MEDIUM_HEAD_TAG         0x32
239 #define MEDIUM_DATA_TAG         0x33
240 #define LMSG_INIT_TAG           0x39 
241 #define VERY_LMSG_INIT_TAG      0x40 
242 #define VERY_LMSG_TAG           0x41 
243
244 #define DEBUG
245 #ifdef GNI_RC_CHECK
246 #undef GNI_RC_CHECK
247 #endif
248 #ifdef DEBUG
249 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
250 #else
251 #define GNI_RC_CHECK(msg,rc)
252 #endif
253
254 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
255 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
256
257 static int useStaticMSGQ = 0;
258 static int useStaticFMA = 0;
259 static int mysize, myrank;
260 gni_nic_handle_t      nic_hndl;
261
262 typedef struct {
263     gni_mem_handle_t mdh;
264     uint64_t addr;
265 } mdh_addr_t ;
266 // this is related to dynamic SMSG
267
268 typedef struct mdh_addr_list{
269     gni_mem_handle_t mdh;
270    void *addr;
271     struct mdh_addr_list *next;
272 }mdh_addr_list_t;
273
274 static unsigned int         smsg_memlen;
275 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
276 mdh_addr_t          setup_mem;
277 mdh_addr_t          *smsg_connection_vec = 0;
278 gni_mem_handle_t    smsg_connection_memhndl;
279 static int          smsg_expand_slots = 10;
280 static int          smsg_available_slot = 0;
281 static void         *smsg_mailbox_mempool = 0;
282 mdh_addr_list_t     *smsg_dynamic_list = 0;
283
284 static void             *smsg_mailbox_base;
285 gni_msgq_attr_t         msgq_attrs;
286 gni_msgq_handle_t       msgq_handle;
287 gni_msgq_ep_attr_t      msgq_ep_attrs;
288 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
289
290 /* =====Beginning of Declarations of Machine Specific Variables===== */
291 static int cookie;
292 static int modes = 0;
293 static gni_cq_handle_t       smsg_rx_cqh = NULL;
294 static gni_cq_handle_t       smsg_tx_cqh = NULL;
295 static gni_cq_handle_t       post_rx_cqh = NULL;
296 static gni_cq_handle_t       post_tx_cqh = NULL;
297 static gni_ep_handle_t       *ep_hndl_array;
298 #if CMK_SMP && !COMM_THREAD_SEND
299 static CmiNodeLock           *ep_lock_array;
300 static CmiNodeLock           tx_cq_lock; 
301 static CmiNodeLock           rx_cq_lock; 
302 #endif
303 typedef struct msg_list
304 {
305     uint32_t destNode;
306     uint32_t size;
307     void *msg;
308     uint8_t tag;
309 #if !CMK_SMP
310     struct msg_list *next;
311 #endif
312 }MSG_LIST;
313
314
315 typedef struct control_msg
316 {
317     uint64_t            source_addr;    /* address from the start of buffer  */
318     uint64_t            dest_addr;      /* address from the start of buffer */
319     //int                 source;         /* source rank */
320     int                 total_length;   /* total length */
321     int                 length;         /* length of this packet */
322     uint8_t             seq_id;         //big message   0 meaning single message
323     gni_mem_handle_t    source_mem_hndl;
324     struct control_msg *next;
325 }CONTROL_MSG;
326
327 #ifdef CMK_DIRECT
328 typedef struct{
329     void *recverBuf;
330     void (*callbackFnPtr)(void *);
331     void *callbackData;
332 }CMK_DIRECT_HEADER;
333 #endif
334 typedef struct  rmda_msg
335 {
336     int                   destNode;
337     gni_post_descriptor_t *pd;
338 #if !CMK_SMP
339     struct  rmda_msg      *next;
340 #endif
341 }RDMA_REQUEST;
342
343 #if CMK_SMP
344 PCQueue sendRdmaBuf;
345 typedef struct  msg_list_index
346 {
347     int         next;
348     PCQueue     sendSmsgBuf;
349 } MSG_LIST_INDEX;
350 #else
351 static RDMA_REQUEST        *sendRdmaBuf = 0;
352 static RDMA_REQUEST        *sendRdmaTail = 0;
353 typedef struct  msg_list_index
354 {
355     int         next;
356     MSG_LIST    *sendSmsgBuf;
357     MSG_LIST    *tail;
358 } MSG_LIST_INDEX;
359 #endif
360
361 /* reuse PendingMsg memory */
362 static CONTROL_MSG          *control_freelist=0;
363 static MSG_LIST             *msglist_freelist=0;
364
365 typedef struct smsg_queue
366 {
367     MSG_LIST_INDEX   *smsg_msglist_index;
368     int               smsg_head_index;
369 } SMSG_QUEUE;
370
371 SMSG_QUEUE                  smsg_queue;
372 SMSG_QUEUE                  smsg_oob_queue;
373
374 #if CMK_SMP
375
376 #define FreeMsgList(d)   free(d);
377 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
378
379 #else
380
381 #define FreeMsgList(d)  \
382   (d)->next = msglist_freelist;\
383   msglist_freelist = d;
384
385 #define MallocMsgList(d) \
386   d = msglist_freelist;\
387   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
388              _MEMCHECK(d);\
389   } else msglist_freelist = d->next; \
390   d->next =0;
391 #endif
392
393 #if CMK_SMP
394
395 #define FreeControlMsg(d)      free(d);
396 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
397
398 #else
399
400 #define FreeControlMsg(d)       \
401   (d)->next = control_freelist;\
402   control_freelist = d;
403
404 #define MallocControlMsg(d) \
405   d = control_freelist;\
406   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
407              _MEMCHECK(d);\
408   } else control_freelist = d->next;
409
410 #endif
411
412 static RDMA_REQUEST         *rdma_freelist = NULL;
413
414 #define FreeMediumControlMsg(d)       \
415   (d)->next = medium_control_freelist;\
416   medium_control_freelist = d;
417
418
419 #define MallocMediumControlMsg(d) \
420     d = medium_control_freelist;\
421     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
422     _MEMCHECK(d);\
423 } else mediumcontrol_freelist = d->next;
424
425 # if CMK_SMP
426 #define FreeRdmaRequest(d)       free(d);
427 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
428 #else
429
430 #define FreeRdmaRequest(d)       \
431   (d)->next = rdma_freelist;\
432   rdma_freelist = d;
433
434 #define MallocRdmaRequest(d) \
435   d = rdma_freelist;\
436   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
437              _MEMCHECK(d);\
438   } else rdma_freelist = d->next; \
439     d->next =0;
440 #endif
441
442 /* reuse gni_post_descriptor_t */
443 static gni_post_descriptor_t *post_freelist=0;
444
445 #if  !CMK_SMP
446 #define FreePostDesc(d)       \
447     (d)->next_descr = post_freelist;\
448     post_freelist = d;
449
450 #define MallocPostDesc(d) \
451   d = post_freelist;\
452   if (d==0) { \
453      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
454      _MEMCHECK(d);\
455   } else post_freelist = d->next_descr;
456 #else
457
458 #define FreePostDesc(d)     free(d);
459 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
460
461 #endif
462
463
464 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
465 static int      buffered_smsg_counter = 0;
466
467 /* SmsgSend return success but message sent is not confirmed by remote side */
468 static MSG_LIST *buffered_fma_head = 0;
469 static MSG_LIST *buffered_fma_tail = 0;
470
471 /* functions  */
472 #define IsFree(a,ind)  !( a& (1<<(ind) ))
473 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
474 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
475
476 CpvDeclare(mempool_type*, mempool);
477
478 /* get the upper bound of log 2 */
479 int mylog2(int size)
480 {
481     int op = size;
482     unsigned int ret=0;
483     unsigned int mask = 0;
484     int i;
485     while(op>0)
486     {
487         op = op >> 1;
488         ret++;
489
490     }
491     for(i=1; i<ret; i++)
492     {
493         mask = mask << 1;
494         mask +=1;
495     }
496
497     ret -= ((size &mask) ? 0:1);
498     return ret;
499 }
500
501 static void
502 allgather(void *in,void *out, int len)
503 {
504     static int *ivec_ptr=NULL,already_called=0,job_size=0;
505     int i,rc;
506     int my_rank;
507     char *tmp_buf,*out_ptr;
508
509     if(!already_called) {
510
511         rc = PMI_Get_size(&job_size);
512         CmiAssert(rc == PMI_SUCCESS);
513         rc = PMI_Get_rank(&my_rank);
514         CmiAssert(rc == PMI_SUCCESS);
515
516         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
517         CmiAssert(ivec_ptr != NULL);
518
519         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
520         CmiAssert(rc == PMI_SUCCESS);
521
522         already_called = 1;
523
524     }
525
526     tmp_buf = (char *)malloc(job_size * len);
527     CmiAssert(tmp_buf);
528
529     rc = PMI_Allgather(in,tmp_buf,len);
530     CmiAssert(rc == PMI_SUCCESS);
531
532     out_ptr = out;
533
534     for(i=0;i<job_size;i++) {
535
536         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
537
538     }
539
540     free(tmp_buf);
541 }
542
543 static void
544 allgather_2(void *in,void *out, int len)
545 {
546     //PMI_Allgather is out of order
547     int i,rc, extend_len;
548     int  rank_index;
549     char *out_ptr, *out_ref;
550     char *in2;
551
552     extend_len = sizeof(int) + len;
553     in2 = (char*)malloc(extend_len);
554
555     memcpy(in2, &myrank, sizeof(int));
556     memcpy(in2+sizeof(int), in, len);
557
558     out_ptr = (char*)malloc(mysize*extend_len);
559
560     rc = PMI_Allgather(in2, out_ptr, extend_len);
561     GNI_RC_CHECK("allgather", rc);
562
563     out_ref = out;
564
565     for(i=0;i<mysize;i++) {
566         //rank index 
567         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
568         //copy to the rank index slot
569         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
570     }
571
572     free(out_ptr);
573     free(in2);
574
575 }
576
577 static unsigned int get_gni_nic_address(int device_id)
578 {
579     unsigned int address, cpu_id;
580     gni_return_t status;
581     int i, alps_dev_id=-1,alps_address=-1;
582     char *token, *p_ptr;
583
584     p_ptr = getenv("PMI_GNI_DEV_ID");
585     if (!p_ptr) {
586         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
587        
588         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
589     } else {
590         while ((token = strtok(p_ptr,":")) != NULL) {
591             alps_dev_id = atoi(token);
592             if (alps_dev_id == device_id) {
593                 break;
594             }
595             p_ptr = NULL;
596         }
597         CmiAssert(alps_dev_id != -1);
598         p_ptr = getenv("PMI_GNI_LOC_ADDR");
599         CmiAssert(p_ptr != NULL);
600         i = 0;
601         while ((token = strtok(p_ptr,":")) != NULL) {
602             if (i == alps_dev_id) {
603                 alps_address = atoi(token);
604                 break;
605             }
606             p_ptr = NULL;
607             ++i;
608         }
609         CmiAssert(alps_address != -1);
610         address = alps_address;
611     }
612     return address;
613 }
614
615 static uint8_t get_ptag(void)
616 {
617     char *p_ptr, *token;
618     uint8_t ptag;
619
620     p_ptr = getenv("PMI_GNI_PTAG");
621     CmiAssert(p_ptr != NULL);
622     token = strtok(p_ptr, ":");
623     ptag = (uint8_t)atoi(token);
624     return ptag;
625         
626 }
627
628 static uint32_t get_cookie(void)
629 {
630     uint32_t cookie;
631     char *p_ptr, *token;
632
633     p_ptr = getenv("PMI_GNI_COOKIE");
634     CmiAssert(p_ptr != NULL);
635     token = strtok(p_ptr, ":");
636     cookie = (uint32_t)atoi(token);
637
638     return cookie;
639 }
640
641 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
642 /* TODO: add any that are related */
643 /* =====End of Definitions of Message-Corruption Related Macros=====*/
644
645
646 #include "machine-lrts.h"
647 #include "machine-common-core.c"
648
649 /* Network progress function is used to poll the network when for
650    messages. This flushes receive buffers on some  implementations*/
651 #if CMK_MACHINE_PROGRESS_DEFINED
652 void CmiMachineProgressImpl() {
653 }
654 #endif
655
656 static void SendRdmaMsg();
657 static void PumpNetworkSmsg();
658 static void PumpLocalRdmaTransactions();
659 static int SendBufferMsg();
660 static     int register_memory_size = 0;
661
662 #if MACHINE_DEBUG_LOG
663 FILE *debugLog = NULL;
664 static CmiInt8 buffered_recv_msg = 0;
665 int         lrts_smsg_success = 0;
666 int         lrts_received_msg = 0;
667 #endif
668
669 static void sweep_mempool(mempool_type *mptr)
670 {
671     block_header *current = &(mptr->block_head);
672
673 printf("[n %d] sweep_mempool slot START .\n", myrank);
674     while( current!= NULL) {
675 printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
676         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
677     }
678 printf("[n %d] sweep_mempool slot END .\n", myrank);
679 }
680
681 inline
682 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
683 {
684     gni_return_t status;
685     block_header *current = *from;
686
687     //while(register_memory_size>= MAX_REG_MEM)
688     //{
689         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
690             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
691
692         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
693         *from = current;
694         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
695         SetMemHndlZero(GetMemHndlFromHeader(current));
696     //}
697     return GNI_RC_SUCCESS;
698 }
699
700 inline 
701 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
702 {
703     gni_return_t status = GNI_RC_SUCCESS;
704     //int size = GetMempoolsize(msg);
705     //void *blockaddr = GetMempoolBlockPtr(msg);
706     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
707    
708     block_header *current = &(mptr->block_head);
709     while(register_memory_size>= MAX_REG_MEM)
710     {
711         status = deregisterMemory(mptr, &current);
712         if (status != GNI_RC_SUCCESS) break;
713     }
714     if(register_memory_size>= MAX_REG_MEM) return status;
715
716     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
717     while(1)
718     {
719         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status)
720         if(status == GNI_RC_SUCCESS)
721         {
722             break;
723         }
724         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
725         {
726             CmiAbort("Memory registor for mempool fails\n");
727         }
728         else
729         {
730             status = deregisterMemory(mptr, &current);
731             if (status != GNI_RC_SUCCESS) break;
732         }
733     }; 
734     return status;
735 }
736
737 inline 
738 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
739 {
740     static int rank = -1;
741     int i;
742     gni_return_t status;
743     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
744     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
745     mempool_type *mptr;
746
747     status = registerFromMempool(mptr1, msg, size, t);
748     if (status == GNI_RC_SUCCESS) return status;
749 #if CMK_SMP
750     for (i=0; i<CmiMyNodeSize()+1; i++) {
751       rank = (rank+1)%(CmiMyNodeSize()+1);
752       mptr = CpvAccessOther(mempool, rank);
753       if (mptr == mptr1) continue;
754       status = registerFromMempool(mptr, msg, size, t);
755       if (status == GNI_RC_SUCCESS) return status;
756     }
757 #endif
758     return  GNI_RC_ERROR_RESOURCE;
759 }
760
761 inline
762 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
763 {
764     MSG_LIST        *msg_tmp;
765     MallocMsgList(msg_tmp);
766     msg_tmp->destNode = destNode;
767     msg_tmp->size   = size;
768     msg_tmp->msg    = msg;
769     msg_tmp->tag    = tag;
770
771 #if !CMK_SMP
772     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
773         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
774         queue->smsg_head_index = destNode;
775         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
776     }else
777     {
778         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
779         queue->smsg_msglist_index[destNode].tail = msg_tmp;
780     }
781 #else
782     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
783 #endif
784 #if PRINT_SYH
785     buffered_smsg_counter++;
786 #endif
787 }
788
789 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
790 {
791     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
792 }
793
794 inline
795 static void setup_smsg_connection(int destNode)
796 {
797     mdh_addr_list_t  *new_entry = 0;
798     gni_post_descriptor_t *pd;
799     gni_smsg_attr_t      *smsg_attr;
800     gni_return_t status = GNI_RC_NOT_DONE;
801     RDMA_REQUEST        *rdma_request_msg;
802     
803     if(smsg_available_slot == smsg_expand_slots)
804     {
805         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
806         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
807         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
808
809         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
810             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
811             GNI_MEM_READWRITE,   
812             -1,
813             &(new_entry->mdh));
814         smsg_available_slot = 0; 
815         new_entry->next = smsg_dynamic_list;
816         smsg_dynamic_list = new_entry;
817     }
818     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
819     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
820     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
821     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
822     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
823     smsg_attr->buff_size = smsg_memlen;
824     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
825     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
826     smsg_local_attr_vec[destNode] = smsg_attr;
827     smsg_available_slot++;
828     MallocPostDesc(pd);
829     pd->type            = GNI_POST_FMA_PUT;
830     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
831     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
832     pd->length          = sizeof(gni_smsg_attr_t);
833     pd->local_addr      = (uint64_t) smsg_attr;
834     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
835     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
836     pd->src_cq_hndl     = 0;
837     pd->rdma_mode       = 0;
838     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
839     print_smsg_attr(smsg_attr);
840     if(status == GNI_RC_ERROR_RESOURCE )
841     {
842         MallocRdmaRequest(rdma_request_msg);
843         rdma_request_msg->destNode = destNode;
844         rdma_request_msg->pd = pd;
845         /* buffer this request */
846     }
847 #if PRINT_SYH
848     if(status != GNI_RC_SUCCESS)
849        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
850     else
851         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
852 #endif
853 }
854
855 /* useDynamicSMSG */
856 inline 
857 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
858 {
859     gni_return_t status = GNI_RC_NOT_DONE;
860
861     if(mailbox_list->offset == mailbox_list->size)
862     {
863         dynamic_smsg_mailbox_t *new_mailbox_entry;
864         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
865         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
866         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
867         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
868         new_mailbox_entry->offset = 0;
869         
870         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
871             new_mailbox_entry->size, smsg_rx_cqh,
872             GNI_MEM_READWRITE,   
873             -1,
874             &(new_mailbox_entry->mem_hndl));
875
876         GNI_RC_CHECK("register", status);
877         new_mailbox_entry->next = mailbox_list;
878         mailbox_list = new_mailbox_entry;
879     }
880     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
881     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
882     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
883     local_smsg_attr->mbox_offset = mailbox_list->offset;
884     mailbox_list->offset += smsg_memlen;
885     local_smsg_attr->buff_size = smsg_memlen;
886     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
887     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
888 }
889
890 /* useDynamicSMSG */
891 inline 
892 static int connect_to(int destNode)
893 {
894     gni_return_t status = GNI_RC_NOT_DONE;
895     CmiAssert(smsg_connected_flag[destNode] == 0);
896     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
897     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
898     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
899     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
900             
901 #if CMK_SMP && !COMM_THREAD_SEND
902     //CmiLock(ep_lock_array[destNode]);
903     CmiLock(tx_cq_lock);
904 #endif
905     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
906 #if CMK_SMP && !COMM_THREAD_SEND
907     //CmiUnlock(ep_lock_array[destNode]);
908     CmiUnlock(tx_cq_lock);
909 #endif
910     if (status == GNI_RC_ERROR_RESOURCE) {
911       /* possibly destNode is making connection at the same time */
912       free(smsg_attr_vector_local[destNode]);
913       smsg_attr_vector_local[destNode] = NULL;
914       free(smsg_attr_vector_remote[destNode]);
915       smsg_attr_vector_remote[destNode] = NULL;
916       mailbox_list->offset -= smsg_memlen;
917       return 0;
918     }
919     GNI_RC_CHECK("GNI_Post", status);
920     smsg_connected_flag[destNode] = 1;
921     return 1;
922 }
923
924 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
925 inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
926 {
927     unsigned int          remote_address;
928     uint32_t              remote_id;
929     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
930     gni_smsg_attr_t       *smsg_attr;
931     gni_post_descriptor_t *pd;
932     gni_post_state_t      post_state;
933     char                  *real_data; 
934
935     if (useDynamicSMSG) {
936         switch (smsg_connected_flag[destNode]) {
937         case 0: 
938             connect_to(destNode);         /* continue to case 1 */
939         case 1:                           /* pending connection, do nothing */
940             status = GNI_RC_NOT_DONE;
941             if(inbuff ==0)
942                 buffer_small_msgs(queue, msg, size, destNode, tag);
943             return status;
944         }
945     }
946 #if CMK_SMP
947     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
948     {
949 #else
950     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
951     {
952 #endif
953 #if CMK_SMP && !COMM_THREAD_SEND
954         CmiLock(tx_cq_lock);
955 #endif
956         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
957 #if CMK_SMP && !COMM_THREAD_SEND
958         CmiUnlock(tx_cq_lock);
959 #endif
960         if(status == GNI_RC_SUCCESS)
961         {
962 #if CMK_SMP_TRACE_COMMTHREAD
963             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
964             { 
965                 START_EVENT();
966                 if ( tag == SMALL_DATA_TAG)
967                     real_data = (char*)msg; 
968                 else 
969                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
970                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
971             }
972 #endif
973             smsg_send_count ++;
974             return status;
975         }else
976             status = GNI_RC_ERROR_RESOURCE;
977     }
978     if(inbuff ==0)
979         buffer_small_msgs(queue, msg, size, destNode, tag);
980     return status;
981 }
982
983
984 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
985 {
986     /* construct a control message and send */
987     CONTROL_MSG         *control_msg_tmp;
988     MallocControlMsg(control_msg_tmp);
989     control_msg_tmp->source_addr = (uint64_t)msg;
990     control_msg_tmp->seq_id    = seqno;
991     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
992 #if     USE_LRTS_MEMPOOL
993     if(size < BIG_MSG)
994     {
995         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
996     }
997     else
998     {
999         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1000         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1001         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1002     }
1003 #else
1004     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1005 #endif
1006     return control_msg_tmp;
1007 }
1008
1009 // Large message, send control to receiver, receiver register memory and do a GET, 
1010 // return 1 - send no success
1011 inline
1012 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1013 {
1014     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1015     uint32_t            vmdh_index  = -1;
1016     int                 size;
1017     int                 offset = 0;
1018     uint64_t            source_addr;
1019     int                 register_size; 
1020     void                *msg;
1021
1022     size    =   control_msg_tmp->total_length;
1023     source_addr = control_msg_tmp->source_addr;
1024     register_size = control_msg_tmp->length;
1025
1026 #if     USE_LRTS_MEMPOOL
1027     if( control_msg_tmp->seq_id == 0 ){
1028         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1029         {
1030             if(buffered_send_msg >= MAX_BUFF_SEND)
1031             {
1032                 if(!inbuff)
1033                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1034                 return GNI_RC_ERROR_NOMEM;
1035             }
1036             //register the corresponding mempool
1037             msg = (void*)source_addr;
1038             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1039             if(status == GNI_RC_SUCCESS)
1040             {
1041                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1042             }
1043         }else
1044         {
1045             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1046             status = GNI_RC_SUCCESS;
1047         }
1048         if(NoMsgInSend( control_msg_tmp->source_addr))
1049             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1050         else
1051             register_size = 0;
1052     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1053     {
1054         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1055         source_addr += offset;
1056         size = control_msg_tmp->length;
1057         
1058         if(IsMemHndlZero(control_msg_tmp->source_mem_hndl))
1059         {
1060             if(buffered_send_msg >= MAX_BUFF_SEND)
1061             {
1062                 if(!inbuff)
1063                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1064                 return GNI_RC_ERROR_NOMEM;
1065             }
1066             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1067             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1068         }
1069         else
1070         {
1071             status = GNI_RC_SUCCESS;
1072         }
1073         register_size = 0;  
1074     }
1075
1076     if(status == GNI_RC_SUCCESS)
1077     {
1078         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1079         if(status == GNI_RC_SUCCESS)
1080         {
1081             buffered_send_msg += register_size;
1082             if(control_msg_tmp->seq_id == 0)
1083             {
1084                 IncreaseMsgInSend(source_addr);
1085             }
1086             FreeControlMsg(control_msg_tmp);
1087             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1088         }else
1089             status = GNI_RC_ERROR_RESOURCE;
1090
1091     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1092     {
1093         CmiAbort("Memory registor for large msg\n");
1094     }else 
1095     {
1096         status = GNI_RC_ERROR_NOMEM; 
1097         if(!inbuff)
1098             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1099     }
1100     return status;
1101 #else
1102     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1103     if(status == GNI_RC_SUCCESS)
1104     {
1105         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1106         if(status == GNI_RC_SUCCESS)
1107         {
1108             FreeControlMsg(control_msg_tmp);
1109         }
1110     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1111     {
1112         CmiAbort("Memory registor for large msg\n");
1113     }else 
1114     {
1115         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1116     }
1117     return status;
1118 #endif
1119 }
1120
1121 inline void LrtsPrepareEnvelope(char *msg, int size)
1122 {
1123     CmiSetMsgSize(msg, size);
1124 }
1125
1126 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1127 {
1128     gni_return_t        status  =   GNI_RC_SUCCESS;
1129     uint8_t tag;
1130     CONTROL_MSG         *control_msg_tmp;
1131     int                 oob = ( mode & OUT_OF_BAND);
1132     SMSG_QUEUE          *queue;
1133
1134     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1135 #if CMK_USE_OOB
1136     queue = oob? &smsg_oob_queue : &smsg_queue;
1137 #else
1138     queue = &smsg_queue;
1139 #endif
1140
1141     LrtsPrepareEnvelope(msg, size);
1142
1143 #if PRINT_SYH
1144     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1145 #endif 
1146 #if CMK_SMP && COMM_THREAD_SEND
1147     if(size <= SMSG_MAX_MSG)
1148         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1149     else if (size < BIG_MSG) {
1150         control_msg_tmp =  construct_control_msg(size, msg, 0);
1151         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1152     }
1153     else {
1154           CmiSetMsgSeq(msg, 0);
1155           control_msg_tmp =  construct_control_msg(size, msg, 1);
1156           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1157     }
1158 #else //non-smp, smp(worker sending)
1159     if(size <= SMSG_MAX_MSG)
1160     {
1161         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1162             CmiFree(msg);
1163     }
1164     else if (size < BIG_MSG) {
1165         control_msg_tmp =  construct_control_msg(size, msg, 0);
1166         send_large_messages(queue, destNode, control_msg_tmp, 0);
1167     }
1168     else {
1169 #if     USE_LRTS_MEMPOOL
1170         CmiSetMsgSeq(msg, 0);
1171         control_msg_tmp =  construct_control_msg(size, msg, 1);
1172         send_large_messages(queue, destNode, control_msg_tmp, 0);
1173 #else
1174         control_msg_tmp =  construct_control_msg(size, msg, 0);
1175         send_large_messages(queue, destNode, control_msg_tmp, 0);
1176 #endif
1177     }
1178 #endif
1179     return 0;
1180 }
1181
1182 static void    PumpDatagramConnection();
1183 static void registerUserTraceEvents() {
1184 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1185     traceRegisterUserEvent("setting up connections", 10);
1186     traceRegisterUserEvent("Receiving small msgs", 20);
1187     traceRegisterUserEvent("Release local transaction", 30);
1188     traceRegisterUserEvent("Sending buffered small msgs", 40);
1189     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1190 #endif
1191 }
1192
1193 static void ProcessDeadlock()
1194 {
1195     static CmiUInt8 *ptr = NULL;
1196     static CmiUInt8  last = 0, mysum, sum;
1197     static int count = 0;
1198     gni_return_t status;
1199     int i;
1200
1201 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1202     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1203     mysum = smsg_send_count + smsg_recv_count;
1204     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1205     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1206     GNI_RC_CHECK("PMI_Allgather", status);
1207     sum = 0;
1208     for (i=0; i<mysize; i++)  sum+= ptr[i];
1209     if (last == 0 || sum == last) 
1210         count++;
1211     else
1212         count = 0;
1213     last = sum;
1214     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1215     if (count == 2) { 
1216         /* detected twice, it is a real deadlock */
1217         if (myrank == 0)  {
1218             CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage.\n");
1219             CmiAbort("Fatal> Deadlock detected.");
1220         }
1221     }
1222     _detected_hang = 0;
1223 }
1224
1225 static void CheckProgress()
1226 {
1227     if (smsg_send_count == last_smsg_send_count &&
1228         smsg_recv_count == last_smsg_recv_count ) 
1229     {
1230         _detected_hang = 1;
1231 #if !CMK_SMP
1232         if (_detected_hang) ProcessDeadlock();
1233 #endif
1234
1235     }
1236     else {
1237         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1238         last_smsg_send_count = smsg_send_count;
1239         last_smsg_recv_count = smsg_recv_count;
1240         _detected_hang = 0;
1241     }
1242 }
1243
1244 void LrtsPostCommonInit(int everReturn)
1245 {
1246 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1247     CpvInitialize(double, projTraceStart);
1248     /* only PE 0 needs to care about registration (to generate sts file). */
1249     if (CmiMyPe() == 0) {
1250         registerMachineUserEventsFunction(&registerUserTraceEvents);
1251     }
1252 #endif
1253
1254 #if CMK_SMP
1255     CmiIdleState *s=CmiNotifyGetState();
1256     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1257     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1258 #else
1259     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1260     if (useDynamicSMSG)
1261     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1262 #endif
1263
1264     if (_checkProgress)
1265 #if CMK_SMP
1266     if (CmiMyRank() == 0)
1267 #endif
1268     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1269 }
1270
1271 /* this is called by worker thread */
1272 void LrtsPostNonLocal(){
1273 #if CMK_SMP
1274 #if !COMM_THREAD_SEND
1275     if(mysize == 1) return;
1276     PumpLocalRdmaTransactions();
1277     SendBufferMsg();
1278     SendRdmaMsg();
1279 #endif
1280 #endif
1281 }
1282
1283 /* useDynamicSMSG */
1284 static void    PumpDatagramConnection()
1285 {
1286     uint32_t          remote_address;
1287     uint32_t          remote_id;
1288     gni_return_t status;
1289     gni_post_state_t  post_state;
1290     uint64_t          datagram_id;
1291     int i;
1292
1293    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1294    {
1295        if (datagram_id >= mysize) {           /* bound endpoint */
1296            int pe = datagram_id - mysize;
1297 #if CMK_SMP && !COMM_THREAD_SEND
1298            CmiLock(tx_cq_lock);
1299 #endif
1300            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1301 #if CMK_SMP && !COMM_THREAD_SEND
1302            CmiUnlock(tx_cq_lock);
1303 #endif
1304            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1305            {
1306                CmiAssert(remote_id == pe);
1307                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1308                GNI_RC_CHECK("Dynamic SMSG Init", status);
1309 #if PRINT_SYH
1310                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1311 #endif
1312                CmiAssert(smsg_connected_flag[pe] == 1);
1313                smsg_connected_flag[pe] = 2;
1314            }
1315        }
1316        else {         /* unbound ep */
1317            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1318            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1319            {
1320                CmiAssert(remote_id<mysize);
1321                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1322                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1323                GNI_RC_CHECK("Dynamic SMSG Init", status);
1324 #if PRINT_SYH
1325                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1326 #endif
1327                smsg_connected_flag[remote_id] = 2;
1328
1329                alloc_smsg_attr(&send_smsg_attr);
1330                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1331                GNI_RC_CHECK("post unbound datagram", status);
1332            }
1333        }
1334    }
1335 }
1336
1337 /* pooling CQ to receive network message */
1338 static void PumpNetworkRdmaMsgs()
1339 {
1340     gni_cq_entry_t      event_data;
1341     gni_return_t        status;
1342
1343 }
1344
1345 inline 
1346 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1347 {
1348     RDMA_REQUEST        *rdma_request_msg;
1349     MallocRdmaRequest(rdma_request_msg);
1350     rdma_request_msg->destNode = inst_id;
1351     rdma_request_msg->pd = pd;
1352 #if CMK_SMP
1353     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1354 #else
1355     if(sendRdmaBuf == 0)
1356     {
1357         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1358     }else{
1359         sendRdmaTail->next = rdma_request_msg;
1360         sendRdmaTail =  rdma_request_msg;
1361     }
1362 #endif
1363
1364 }
1365 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1366 static void PumpNetworkSmsg()
1367 {
1368     uint64_t            inst_id;
1369     int                 ret;
1370     gni_cq_entry_t      event_data;
1371     gni_return_t        status, status2;
1372     void                *header;
1373     uint8_t             msg_tag;
1374     int                 msg_nbytes;
1375     void                *msg_data;
1376     gni_mem_handle_t    msg_mem_hndl;
1377     gni_smsg_attr_t     *smsg_attr;
1378     gni_smsg_attr_t     *remote_smsg_attr;
1379     int                 init_flag;
1380     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1381     uint64_t            source_addr;
1382     SMSG_QUEUE         *queue = &smsg_queue;
1383
1384 #if CMK_SMP && !COMM_THREAD_SEND
1385     while(1)
1386     {
1387         CmiLock(tx_cq_lock);
1388         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1389         CmiUnlock(tx_cq_lock);
1390         if(status != GNI_RC_SUCCESS)
1391             break;
1392 #else
1393     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1394     {
1395 #endif
1396         inst_id = GNI_CQ_GET_INST_ID(event_data);
1397         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1398 #if PRINT_SYH
1399         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1400 #endif
1401         if (useDynamicSMSG) {
1402             /* subtle: smsg may come before connection is setup */
1403             while (smsg_connected_flag[inst_id] != 2) 
1404                PumpDatagramConnection();
1405         }
1406         msg_tag = GNI_SMSG_ANY_TAG;
1407 #if CMK_SMP && !COMM_THREAD_SEND
1408         while(1) {
1409             CmiLock(tx_cq_lock);
1410             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1411             CmiUnlock(tx_cq_lock);
1412             if (status != GNI_RC_SUCCESS)
1413                 break;
1414 #else
1415         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1416 #endif
1417             /* copy msg out and then put into queue (small message) */
1418             switch (msg_tag) {
1419             case SMALL_DATA_TAG:
1420             {
1421                 START_EVENT();
1422                 msg_nbytes = CmiGetMsgSize(header);
1423                 msg_data    = CmiAlloc(msg_nbytes);
1424                 memcpy(msg_data, (char*)header, msg_nbytes);
1425 #if CMK_SMP_TRACE_COMMTHREAD
1426                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1427 #endif
1428                 handleOneRecvedMsg(msg_nbytes, msg_data);
1429                 break;
1430             }
1431             case LMSG_INIT_TAG:
1432             {
1433 #if PRINT_SYH
1434                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1435 #endif
1436                 getLargeMsgRequest(header, inst_id);
1437                 break;
1438             }
1439             case ACK_TAG:   //msg fit into mempool
1440             {
1441                 /* Get is done, release message . Now put is not used yet*/
1442                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1443 #if ! USE_LRTS_MEMPOOL
1444                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1445 #else
1446                 DecreaseMsgInSend(msg);
1447 #endif
1448                 if(NoMsgInSend(msg))
1449                     buffered_send_msg -= GetMempoolsize(msg);
1450                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1451                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1452                 break;
1453             }
1454             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1455             {
1456                 header_tmp = (CONTROL_MSG *) header;
1457                 void *msg = (void*)(header_tmp->source_addr);
1458                 int cur_seq = CmiGetMsgSeq(msg);
1459                 int offset = ONE_SEG*(cur_seq+1);
1460                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1461                 buffered_send_msg -= header_tmp->length;
1462                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1463                 if (remain_size < 0) remain_size = 0;
1464                 CmiSetMsgSize(msg, remain_size);
1465                 if(remain_size <= 0) //transaction done
1466                 {
1467                     CmiFree(msg);
1468                 }else if (header_tmp->total_length > offset)
1469                 {
1470                     CmiSetMsgSeq(msg, cur_seq+1);
1471                     MallocControlMsg(control_msg_tmp);
1472                     control_msg_tmp->source_addr = header_tmp->source_addr;
1473                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1474                     control_msg_tmp->total_length   = header_tmp->total_length; 
1475                     control_msg_tmp->length         = header_tmp->total_length - offset;
1476                     SetMemHndlZero(control_msg_tmp->source_mem_hndl); 
1477                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1478                     control_msg_tmp->seq_id         = cur_seq+1+1;
1479                     //send next seg
1480                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1481                          // pipelining
1482                     if (header_tmp->seq_id == 1) {
1483                       int i;
1484                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1485                         int seq = cur_seq+i+2;
1486                         CmiSetMsgSeq(msg, seq-1);
1487                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1488                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1489                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1490                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1491                       }
1492                     }
1493                 }
1494                 break;
1495             }
1496 #if CMK_PERSISTENT_COMM
1497             case PUT_DONE_TAG: //persistent message
1498                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1499                 int size = ((CONTROL_MSG *) header)->length;
1500                 CmiReference(msg);
1501                 handleOneRecvedMsg(size, msg); 
1502                 break;
1503 #endif
1504 #ifdef CMK_DIRECT
1505             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1506                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1507                 break;
1508 #endif
1509             default: {
1510                 printf("weird tag problem\n");
1511                 CmiAbort("Unknown tag\n");
1512                      }
1513             }               // end switch
1514 #if CMK_SMP && !COMM_THREAD_SEND
1515             CmiLock(tx_cq_lock);
1516 #endif
1517             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1518 #if CMK_SMP && !COMM_THREAD_SEND
1519             CmiUnlock(tx_cq_lock);
1520 #endif
1521             smsg_recv_count ++;
1522             msg_tag = GNI_SMSG_ANY_TAG;
1523         } //endwhile getNext
1524     }   //end while GetEvent
1525     if(status == GNI_RC_ERROR_RESOURCE)
1526     {
1527         GNI_RC_CHECK("Smsg_rx_cq full", status);
1528     }
1529 }
1530
1531 static void printDesc(gni_post_descriptor_t *pd)
1532 {
1533     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1534 }
1535
1536 // for BIG_MSG called on receiver side for receiving control message
1537 // LMSG_INIT_TAG
1538 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1539 {
1540 #if     USE_LRTS_MEMPOOL
1541     CONTROL_MSG         *request_msg;
1542     gni_return_t        status = GNI_RC_SUCCESS;
1543     void                *msg_data;
1544     gni_post_descriptor_t *pd;
1545     gni_mem_handle_t    msg_mem_hndl;
1546     int source, size, transaction_size, offset = 0;
1547     int     register_size = 0;
1548
1549     // initial a get to transfer data from the sender side */
1550     request_msg = (CONTROL_MSG *) header;
1551     size = request_msg->total_length;
1552     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1553     if(request_msg->seq_id < 2)   {
1554         msg_data = CmiAlloc(size);
1555         CmiSetMsgSeq(msg_data, 0);
1556         _MEMCHECK(msg_data);
1557     }
1558     else {
1559         offset = ONE_SEG*(request_msg->seq_id-1);
1560         msg_data = (char*)request_msg->dest_addr + offset;
1561     }
1562    
1563     MallocPostDesc(pd);
1564     pd->cqwrite_value = request_msg->seq_id;
1565     if( request_msg->seq_id == 0)
1566     {
1567         pd->local_mem_hndl= GetMemHndl(msg_data);
1568         transaction_size = ALIGN64(size);
1569         if(IsMemHndlZero(pd->local_mem_hndl))
1570         {   
1571             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1572             if(status == GNI_RC_SUCCESS)
1573             {
1574                 pd->local_mem_hndl = GetMemHndl(msg_data);
1575             }
1576             else
1577             {
1578                 SetMemHndlZero(pd->local_mem_hndl);
1579             }
1580         }
1581         if(NoMsgInRecv( (void*)(msg_data)))
1582             register_size = GetMempoolsize((void*)(msg_data));
1583         else
1584             register_size = 0;
1585     }
1586     else{
1587         transaction_size = ALIGN64(request_msg->length);
1588         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1589         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1590         {
1591             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1592         }
1593     }
1594     pd->first_operand = ALIGN64(size);                   //  total length
1595
1596     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1597         pd->type            = GNI_POST_FMA_GET;
1598     else
1599         pd->type            = GNI_POST_RDMA_GET;
1600 #if REMOTE_EVENT
1601     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1602 #else
1603     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1604 #endif
1605     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1606     pd->length          = transaction_size;
1607     pd->local_addr      = (uint64_t) msg_data;
1608     pd->remote_addr     = request_msg->source_addr + offset;
1609     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1610     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1611     pd->rdma_mode       = 0;
1612     pd->amo_cmd         = 0;
1613
1614     //memory registration success
1615     if(status == GNI_RC_SUCCESS)
1616     {
1617 #if CMK_SMP && !COMM_THREAD_SEND
1618         CmiLock(tx_cq_lock);
1619 #endif
1620         if(pd->type == GNI_POST_RDMA_GET) 
1621             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1622         else
1623             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1624 #if CMK_SMP && !COMM_THREAD_SEND
1625         CmiUnlock(tx_cq_lock);
1626 #endif
1627          
1628         if(status == GNI_RC_SUCCESS )
1629         {
1630             if(pd->cqwrite_value == 0)
1631             {
1632 #if MACHINE_DEBUG_LOG
1633                 buffered_recv_msg += register_size;
1634                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1635 #endif
1636                 IncreaseMsgInRecv(msg_data);
1637
1638             }
1639         }
1640     }else
1641     {
1642         SetMemHndlZero((pd->local_mem_hndl));
1643     }
1644     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1645     {
1646         bufferRdmaMsg(inst_id, pd); 
1647     }else {
1648          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1649         GNI_RC_CHECK("GetLargeAFter posting", status);
1650     }
1651 #else
1652     CONTROL_MSG         *request_msg;
1653     gni_return_t        status;
1654     void                *msg_data;
1655     gni_post_descriptor_t *pd;
1656     RDMA_REQUEST        *rdma_request_msg;
1657     gni_mem_handle_t    msg_mem_hndl;
1658     //int source;
1659     // initial a get to transfer data from the sender side */
1660     request_msg = (CONTROL_MSG *) header;
1661     msg_data = CmiAlloc(request_msg->length);
1662     _MEMCHECK(msg_data);
1663
1664     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1665
1666     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1667     {
1668         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1669     }
1670
1671     MallocPostDesc(pd);
1672     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1673         pd->type            = GNI_POST_FMA_GET;
1674     else
1675         pd->type            = GNI_POST_RDMA_GET;
1676 #if REMOTE_EVENT
1677     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1678 #else
1679     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1680 #endif
1681     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1682     pd->length          = ALIGN64(request_msg->length);
1683     pd->local_addr      = (uint64_t) msg_data;
1684     pd->remote_addr     = request_msg->source_addr;
1685     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1686     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1687     pd->rdma_mode       = 0;
1688     pd->amo_cmd         = 0;
1689
1690     //memory registration successful
1691     if(status == GNI_RC_SUCCESS)
1692     {
1693         pd->local_mem_hndl  = msg_mem_hndl;
1694 #if CMK_SMP && !COMM_THREAD_SEND
1695             CmiLock(tx_cq_lock);
1696 #endif
1697         if(pd->type == GNI_POST_RDMA_GET) 
1698             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1699         else
1700             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1701 #if CMK_SMP && !COMM_THREAD_SEND
1702             CmiUnlock(tx_cq_lock);
1703 #endif
1704     }else
1705     {
1706         SetMemHndlZero(pd->local_mem_hndl);
1707     }
1708     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1709     {
1710         MallocRdmaRequest(rdma_request_msg);
1711         rdma_request_msg->next = 0;
1712         rdma_request_msg->destNode = inst_id;
1713         rdma_request_msg->pd = pd;
1714         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1715     }else {
1716         GNI_RC_CHECK("AFter posting", status);
1717     }
1718 #endif
1719 }
1720
1721 static void PumpLocalRdmaTransactions()
1722 {
1723     gni_cq_entry_t          ev;
1724     gni_return_t            status;
1725     uint64_t                type, inst_id;
1726     gni_post_descriptor_t   *tmp_pd;
1727     MSG_LIST                *ptr;
1728     CONTROL_MSG             *ack_msg_tmp;
1729     uint8_t             msg_tag;
1730 #ifdef CMK_DIRECT
1731     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1732 #endif
1733     SMSG_QUEUE         *queue = &smsg_queue;
1734
1735 #if CMK_SMP && !COMM_THREAD_SEND
1736     while(1) {
1737         CmiLock(tx_cq_lock);
1738         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1739         CmiUnlock(tx_cq_lock);
1740         if(status != GNI_RC_SUCCESS) break;
1741 #else
1742     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1743     {
1744 #endif
1745         type = GNI_CQ_GET_TYPE(ev);
1746         if (type == GNI_CQ_EVENT_TYPE_POST)
1747         {
1748             inst_id     = GNI_CQ_GET_INST_ID(ev);
1749 #if PRINT_SYH
1750             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1751 #endif
1752 #if CMK_SMP && !COMM_THREAD_SEND
1753             CmiLock(tx_cq_lock);
1754 #endif
1755             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1756 #if CMK_SMP && !COMM_THREAD_SEND
1757             CmiUnlock(tx_cq_lock);
1758 #endif
1759 #ifdef CMK_DIRECT
1760             if(tmp_pd->amo_cmd == 1)
1761             {
1762                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1763                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1764                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1765                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1766             }
1767             else{
1768                 MallocControlMsg(ack_msg_tmp);
1769                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1770                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1771             }
1772 #else
1773             MallocControlMsg(ack_msg_tmp);
1774             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1775             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1776 #endif
1777             ////Message is sent, free message , put is not used now
1778             switch (tmp_pd->type) {
1779 #if CMK_PERSISTENT_COMM
1780             case GNI_POST_RDMA_PUT:
1781 #if ! USE_LRTS_MEMPOOL
1782                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1783 #endif
1784             case GNI_POST_FMA_PUT:
1785                 CmiFree((void *)tmp_pd->local_addr);
1786                 msg_tag = PUT_DONE_TAG;
1787                 break;
1788 #endif
1789 #ifdef CMK_DIRECT
1790             case GNI_POST_RDMA_PUT:
1791             case GNI_POST_FMA_PUT:
1792                 //sender ACK to receiver to trigger it is done
1793                 msg_tag = DIRECT_PUT_DONE_TAG;
1794                 break;
1795 #endif
1796             case GNI_POST_RDMA_GET:
1797             case GNI_POST_FMA_GET:
1798 #if  ! USE_LRTS_MEMPOOL
1799                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1800                 msg_tag = ACK_TAG;  
1801 #else
1802                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1803                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1804                 {
1805                     msg_tag = BIG_MSG_TAG; 
1806                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1807                     
1808                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1809                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1810                 } 
1811                 else
1812                 {
1813                     msg_tag = ACK_TAG;  
1814                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1815                 }
1816                 ack_msg_tmp->length = tmp_pd->length;
1817                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1818 #endif
1819                 break;
1820             default:
1821                 CmiPrintf("type=%d\n", tmp_pd->type);
1822                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1823             }
1824 #if CMK_DIRECT
1825             if(tmp_pd->amo_cmd == 1)
1826                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1827             else
1828 #endif
1829                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1830             if(status == GNI_RC_SUCCESS)
1831             {
1832 #if CMK_DIRECT
1833                 if(tmp_pd->amo_cmd == 1)
1834                     free(cmk_direct_done_msg); 
1835                 else
1836 #endif
1837                     FreeControlMsg(ack_msg_tmp);
1838
1839             }
1840 #if CMK_PERSISTENT_COMM
1841             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1842 #endif
1843             {
1844                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1845 #if PRINT_SYH
1846                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1847 #endif
1848 #if CMK_SMP_TRACE_COMMTHREAD
1849                     START_EVENT();
1850 #endif
1851                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1852                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1853 #if MACHINE_DEBUG_LOG
1854                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1855                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1856                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1857 #endif
1858 #if CMK_SMP_TRACE_COMMTHREAD
1859                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1860 #endif
1861                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1862                 }else if(msg_tag == BIG_MSG_TAG){
1863                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1864                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1865                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1866 #if CMK_SMP_TRACE_COMMTHREAD
1867                         START_EVENT();
1868 #endif
1869 #if PRINT_SYH
1870                         printf("Pipeline msg done [%d]\n", myrank);
1871 #endif
1872 #if CMK_SMP_TRACE_COMMTHREAD
1873                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1874 #endif
1875                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1876                     }
1877                 }
1878             }
1879             FreePostDesc(tmp_pd);
1880         }
1881     } //end while
1882     if(status == GNI_RC_ERROR_RESOURCE)
1883     {
1884         GNI_RC_CHECK("Smsg_tx_cq full", status);
1885     }
1886 }
1887
1888 static void  SendRdmaMsg()
1889 {
1890     gni_return_t            status = GNI_RC_SUCCESS;
1891     gni_mem_handle_t        msg_mem_hndl;
1892     RDMA_REQUEST *ptr = 0, *tmp_ptr;
1893     RDMA_REQUEST *pre = 0;
1894     int i, register_size = 0;
1895     void                    *msg;
1896 #if CMK_SMP
1897     int len = PCQueueLength(sendRdmaBuf);
1898     for (i=0; i<len; i++)
1899     {
1900         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1901         if (ptr == NULL) break;
1902 #else
1903     ptr = sendRdmaBuf;
1904     while (ptr!=0)
1905     {
1906 #endif 
1907         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1908         gni_post_descriptor_t *pd = ptr->pd;
1909         status = GNI_RC_SUCCESS;
1910         
1911         if(pd->cqwrite_value == 0)
1912         {
1913             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1914             {
1915                 msg = (void*)(pd->local_addr);
1916                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1917                 if(status == GNI_RC_SUCCESS)
1918                 {
1919                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1920                 }
1921             }else
1922             {
1923                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1924             }
1925             if(NoMsgInRecv( (void*)(pd->local_addr)))
1926                 register_size = GetMempoolsize((void*)(pd->local_addr));
1927             else
1928                 register_size = 0;
1929         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1930         {
1931             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1932         }
1933         if(status == GNI_RC_SUCCESS)        //mem register good
1934         {
1935 #if CMK_SMP && !COMM_THREAD_SEND
1936             CmiLock(tx_cq_lock);
1937 #endif
1938             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1939                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1940             else
1941                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1942 #if CMK_SMP && !COMM_THREAD_SEND
1943             CmiUnlock(tx_cq_lock);
1944 #endif
1945
1946             if(status == GNI_RC_SUCCESS)    //post good
1947             {
1948 #if !CMK_SMP
1949                 tmp_ptr = ptr;
1950                 if(pre != 0) {
1951                     pre->next = ptr->next;
1952                 }
1953                 else {
1954                     sendRdmaBuf = ptr->next;
1955                 }
1956                 ptr = ptr->next;
1957                 FreeRdmaRequest(tmp_ptr);
1958 #endif
1959                 if(pd->cqwrite_value == 0)
1960                 {
1961                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1962                 }
1963 #if MACHINE_DEBUG_LOG
1964                 buffered_recv_msg += register_size;
1965                 MACHSTATE(8, "GO request from buffered\n"); 
1966 #endif
1967             }else           // cannot post
1968             {
1969 #if CMK_SMP
1970                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1971 #else
1972                 pre = ptr;
1973                 ptr = ptr->next;
1974 #endif
1975                 break;
1976             }
1977         } else          //memory registration fails
1978         {
1979 #if CMK_SMP
1980             PCQueuePush(sendRdmaBuf, (char*)ptr);
1981 #else
1982             pre = ptr;
1983             ptr = ptr->next;
1984 #endif
1985         }
1986     } //end while
1987 #if ! CMK_SMP
1988     if(ptr == 0)
1989         sendRdmaTail = pre;
1990 #endif
1991 }
1992
1993 // return 1 if all messages are sent
1994 static int SendBufferMsg(SMSG_QUEUE *queue)
1995 {
1996     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1997     CONTROL_MSG         *control_msg_tmp;
1998     gni_return_t        status;
1999     int                 done = 1;
2000     int                 register_size;
2001     void                *register_addr;
2002     int                 index_previous = -1;
2003     int                 index = queue->smsg_head_index;
2004 #if CMI_EXERT_SEND_CAP
2005     int                 sent_cnt = 0;
2006 #endif
2007
2008 #if ! CMK_SMP
2009     index = queue->smsg_head_index;
2010 #else
2011     index = 0;
2012 #endif
2013 #if CMK_SMP
2014     while(index <mysize)
2015     {
2016         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
2017         for (i=0; i<len; i++) 
2018         {
2019             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
2020             if (ptr == 0) break;
2021 #else
2022     while(index != -1)
2023     {
2024         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2025         pre = 0;
2026         while(ptr != 0)
2027         {
2028 #endif
2029             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2030             status = GNI_RC_ERROR_RESOURCE;
2031             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2032                 /* connection not exists yet */
2033             }
2034             else
2035             switch(ptr->tag)
2036             {
2037             case SMALL_DATA_TAG:
2038                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2039                 if(status == GNI_RC_SUCCESS)
2040                 {
2041                     CmiFree(ptr->msg);
2042                 }
2043                 break;
2044             case LMSG_INIT_TAG:
2045                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2046                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2047                 break;
2048             case   ACK_TAG:
2049             case   BIG_MSG_TAG:
2050                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2051                 if(status == GNI_RC_SUCCESS)
2052                 {
2053                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2054                 }
2055                 break;
2056 #ifdef CMK_DIRECT
2057             case DIRECT_PUT_DONE_TAG:
2058                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2059                 if(status == GNI_RC_SUCCESS)
2060                 {
2061                     free((CMK_DIRECT_HEADER*)ptr->msg);
2062                 }
2063                 break;
2064
2065 #endif
2066             default:
2067                 printf("Weird tag\n");
2068                 CmiAbort("should not happen\n");
2069             }       // end switch
2070             if(status == GNI_RC_SUCCESS)
2071             {
2072 #if !CMK_SMP
2073                 tmp_ptr = ptr;
2074                 if(pre)
2075                 {
2076                     ptr = pre ->next = ptr->next;
2077                 }else
2078                 {
2079                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2080                 }
2081                 FreeMsgList(tmp_ptr);
2082 #else
2083                 FreeMsgList(ptr);
2084 #endif
2085 #if PRINT_SYH
2086                 buffered_smsg_counter--;
2087                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2088 #endif
2089 #if CMI_EXERT_SEND_CAP
2090                 sent_cnt++;
2091                 if(sent_cnt == SEND_CAP)
2092                     break;
2093 #endif
2094             }else {
2095 #if CMK_SMP
2096                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2097 #else
2098                 pre = ptr;
2099                 ptr=ptr->next;
2100 #endif
2101                 done = 0;
2102                 if(status == GNI_RC_ERROR_RESOURCE)
2103                     break;
2104             } 
2105         } //end while
2106 #if !CMK_SMP
2107         if(ptr == 0)
2108             queue->smsg_msglist_index[index].tail = pre;
2109         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2110         {
2111             if(index_previous != -1)
2112                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2113             else
2114                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2115         }else
2116         {
2117             index_previous = index;
2118         }
2119         index = queue->smsg_msglist_index[index].next;
2120 #else
2121         index++;
2122 #endif
2123
2124 #if CMI_EXERT_SEND_CAP
2125         if(sent_cnt == SEND_CAP)
2126                 break;
2127 #endif
2128     }   // end pooling for all cores
2129     return done;
2130 }
2131
2132 static void ProcessDeadlock();
2133 void LrtsAdvanceCommunication(int whileidle)
2134 {
2135     /*  Receive Msg first */
2136 #if CMK_SMP_TRACE_COMMTHREAD
2137     double startT, endT;
2138 #endif
2139     if (useDynamicSMSG && whileidle)
2140     {
2141 #if CMK_SMP_TRACE_COMMTHREAD
2142         startT = CmiWallTimer();
2143 #endif
2144         PumpDatagramConnection();
2145 #if CMK_SMP_TRACE_COMMTHREAD
2146         endT = CmiWallTimer();
2147         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2148 #endif
2149     }
2150
2151 #if CMK_SMP_TRACE_COMMTHREAD
2152     startT = CmiWallTimer();
2153 #endif
2154     PumpNetworkSmsg();
2155     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2156 #if CMK_SMP_TRACE_COMMTHREAD
2157     endT = CmiWallTimer();
2158     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2159 #endif
2160
2161 #if CMK_SMP_TRACE_COMMTHREAD
2162     startT = CmiWallTimer();
2163 #endif
2164     PumpLocalRdmaTransactions();
2165     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2166 #if CMK_SMP_TRACE_COMMTHREAD
2167     endT = CmiWallTimer();
2168     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2169 #endif
2170     /* Send buffered Message */
2171 #if CMK_SMP_TRACE_COMMTHREAD
2172     startT = CmiWallTimer();
2173 #endif
2174 #if CMK_USE_OOB
2175     if (SendBufferMsg(&smsg_oob_queue) == 1)
2176 #endif
2177     SendBufferMsg(&smsg_queue);
2178     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2179 #if CMK_SMP_TRACE_COMMTHREAD
2180     endT = CmiWallTimer();
2181     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2182 #endif
2183
2184 #if CMK_SMP_TRACE_COMMTHREAD
2185     startT = CmiWallTimer();
2186 #endif
2187     SendRdmaMsg();
2188     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2189 #if CMK_SMP_TRACE_COMMTHREAD
2190     endT = CmiWallTimer();
2191     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2192 #endif
2193
2194 #if CMK_SMP
2195     if (_detected_hang)  ProcessDeadlock();
2196 #endif
2197 }
2198
2199 /* useDynamicSMSG */
2200 static void _init_dynamic_smsg()
2201 {
2202     gni_return_t status;
2203     uint32_t     vmdh_index = -1;
2204     int i;
2205
2206     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2207     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2208     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2209     for(i=0; i<mysize; i++) {
2210         smsg_connected_flag[i] = 0;
2211         smsg_attr_vector_local[i] = NULL;
2212         smsg_attr_vector_remote[i] = NULL;
2213     }
2214     if(mysize <=512)
2215     {
2216         SMSG_MAX_MSG = 4096;
2217     }else if (mysize <= 4096)
2218     {
2219         SMSG_MAX_MSG = 4096/mysize * 1024;
2220     }else if (mysize <= 16384)
2221     {
2222         SMSG_MAX_MSG = 512;
2223     }else {
2224         SMSG_MAX_MSG = 256;
2225     }
2226
2227     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2228     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2229     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2230     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2231     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2232
2233     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2234     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2235     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2236     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2237     mailbox_list->offset = 0;
2238     mailbox_list->next = 0;
2239     
2240     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2241         mailbox_list->size, smsg_rx_cqh,
2242         GNI_MEM_READWRITE,   
2243         vmdh_index,
2244         &(mailbox_list->mem_hndl));
2245     GNI_RC_CHECK("MEMORY registration for smsg", status);
2246
2247     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2248     GNI_RC_CHECK("Unbound EP", status);
2249     
2250     alloc_smsg_attr(&send_smsg_attr);
2251
2252     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2253     GNI_RC_CHECK("post unbound datagram", status);
2254
2255       /* always pre-connect to proc 0 */
2256     //if (myrank != 0) connect_to(0);
2257 }
2258
2259 static void _init_static_smsg()
2260 {
2261     gni_smsg_attr_t      *smsg_attr;
2262     gni_smsg_attr_t      remote_smsg_attr;
2263     gni_smsg_attr_t      *smsg_attr_vec;
2264     gni_mem_handle_t     my_smsg_mdh_mailbox;
2265     int      ret, i;
2266     gni_return_t status;
2267     uint32_t              vmdh_index = -1;
2268     mdh_addr_t            base_infor;
2269     mdh_addr_t            *base_addr_vec;
2270     char *env;
2271
2272     if(mysize <=512)
2273     {
2274         SMSG_MAX_MSG = 1024;
2275     }else if (mysize <= 4096)
2276     {
2277         SMSG_MAX_MSG = 1024;
2278     }else if (mysize <= 16384)
2279     {
2280         SMSG_MAX_MSG = 512;
2281     }else {
2282         SMSG_MAX_MSG = 256;
2283     }
2284     
2285     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2286     if (env) SMSG_MAX_MSG = atoi(env);
2287     CmiAssert(SMSG_MAX_MSG > 0);
2288
2289     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2290     
2291     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2292     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2293     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2294     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2295     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2296     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2297     CmiAssert(ret == 0);
2298     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2299     
2300     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2301             smsg_memlen*(mysize), smsg_rx_cqh,
2302             GNI_MEM_READWRITE,   
2303             vmdh_index,
2304             &my_smsg_mdh_mailbox);
2305     register_memory_size += smsg_memlen*(mysize);
2306     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2307
2308     if (myrank == 0)  printf("Charm++> SMSG memory: %d\n", smsg_memlen*(mysize));
2309     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2310
2311     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2312     base_infor.mdh =  my_smsg_mdh_mailbox;
2313     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2314
2315     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2316  
2317     for(i=0; i<mysize; i++)
2318     {
2319         if(i==myrank)
2320             continue;
2321         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2322         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2323         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2324         smsg_attr[i].mbox_offset = i*smsg_memlen;
2325         smsg_attr[i].buff_size = smsg_memlen;
2326         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2327         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2328     }
2329
2330     for(i=0; i<mysize; i++)
2331     {
2332         if (myrank == i) continue;
2333
2334         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2335         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2336         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2337         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2338         remote_smsg_attr.buff_size = smsg_memlen;
2339         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2340         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2341
2342         /* initialize the smsg channel */
2343         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2344         GNI_RC_CHECK("SMSG Init", status);
2345     } //end initialization
2346
2347     free(base_addr_vec);
2348     free(smsg_attr);
2349
2350     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2351     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2352
2353
2354 inline
2355 static void _init_send_queue(SMSG_QUEUE *queue)
2356 {
2357      int i;
2358      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2359      for(i =0; i<mysize; i++)
2360      {
2361         queue->smsg_msglist_index[i].next = -1;
2362 #if CMK_SMP
2363         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2364 #else
2365         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2366 #endif
2367         
2368      }
2369      queue->smsg_head_index = -1;
2370 }
2371
2372 inline
2373 static void _init_smsg()
2374 {
2375     if(mysize > 1) {
2376         if (useDynamicSMSG)
2377             _init_dynamic_smsg();
2378         else
2379             _init_static_smsg();
2380     }
2381
2382     _init_send_queue(&smsg_queue);
2383 #if CMK_USE_OOB
2384     _init_send_queue(&smsg_oob_queue);
2385 #endif
2386 }
2387
2388 static void _init_static_msgq()
2389 {
2390     gni_return_t status;
2391     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2392     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2393     msgq_attrs.smsg_q_sz = 1;
2394     msgq_attrs.rcv_pool_sz = 1;
2395     msgq_attrs.num_msgq_eps = 2;
2396     msgq_attrs.nloc_insts = 8;
2397     msgq_attrs.modes = 0;
2398     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2399
2400     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2401     GNI_RC_CHECK("MSGQ Init", status);
2402
2403
2404 }
2405
2406 #if CMK_SMP && STEAL_MEMPOOL
2407 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2408 {
2409     void *pool = NULL;
2410     int i, k;
2411     // check other ranks
2412     for (k=0; k<CmiMyNodeSize()+1; k++) {
2413         i = (CmiMyRank()+k)%CmiMyNodeSize();
2414         if (i==CmiMyRank()) continue;
2415         mempool_type *mptr = CpvAccessOther(mempool, i);
2416         CmiLock(mptr->mempoolLock);
2417         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2418         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2419             CmiUnlock(mptr->mempoolLock);
2420             continue;
2421         }
2422         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2423         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2424             /* search in the free list */
2425           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2426           mempool_header *current = free_header;
2427           while (current) {
2428             if (current->next_free == (char*)header-(char*)mptr) break;
2429             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2430           }
2431           if (current == NULL) {         /* not found in free list */
2432             CmiUnlock(mptr->mempoolLock);
2433             continue;
2434           }
2435 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2436             /* search the previous memblock, and remove the tail */
2437           mempool_block *ptr = (mempool_block *)mptr;
2438           while (ptr) {
2439             if (ptr->memblock_next == mptr->memblock_tail) break;
2440             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2441           }
2442           CmiAssert(ptr!=NULL);
2443           ptr->memblock_next = 0;
2444           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2445
2446             /* remove memblock from the free list */
2447           current->next_free = header->next_free;
2448           if (header == free_header) mptr->freelist_head = header->next_free;
2449
2450           CmiUnlock(mptr->mempoolLock);
2451
2452           pool = (void*)tail;
2453           *mem_hndl = tail->mem_hndl;
2454           *size = tail->size;
2455           return pool;
2456         }
2457         CmiUnlock(mptr->mempoolLock);
2458     }
2459
2460       /* steal failed, deregister and free memblock now */
2461     int freed = 0;
2462     for (k=0; k<CmiMyNodeSize()+1; k++) {
2463         i = (CmiMyRank()+k)%CmiMyNodeSize();
2464         mempool_type *mptr = CpvAccessOther(mempool, i);
2465         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2466
2467         mempool_block *mempools_head = &(mptr->mempools_head);
2468         mempool_block *current = mempools_head;
2469         mempool_block *prev = NULL;
2470
2471         while (current) {
2472           int isfree = 0;
2473           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2474 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2475           mempool_header *cur = free_header;
2476           mempool_header *header;
2477           if (current != mempools_head) {
2478             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2479              /* search in free list */
2480             if (header->size == current->size - sizeof(mempool_block)) {
2481               cur = free_header;
2482               while (cur) {
2483                 if (cur->next_free == (char*)header-(char*)mptr) break;
2484                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2485               }
2486               if (cur != NULL) isfree = 1;
2487             }
2488           }
2489           if (isfree) {
2490               /* remove from free list */
2491             cur->next_free = header->next_free;
2492             if (header == free_header) mptr->freelist_head = header->next_free;
2493              // deregister
2494             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2495             GNI_RC_CHECK("steal Mempool de-register", status);
2496             mempool_block *ptr = current;
2497             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2498             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2499 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2500             freed += ptr->size;
2501             free(ptr);
2502              // try now
2503             if (freed > *size) {
2504               if (pool == NULL) {
2505                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2506                 CmiAssert(ret == 0);
2507               }
2508               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2509               if (status == GNI_RC_SUCCESS) {
2510                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2511 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2512                 return pool;
2513               }
2514 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2515             }
2516           }
2517           else {
2518              prev = current;
2519              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2520           }
2521         }
2522
2523         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2524     }
2525       /* still no luck registering pool */
2526     if (pool) free(pool);
2527     return NULL;
2528 }
2529 #endif
2530
2531 static long long int total_mempool_size = 0;
2532 static long long int total_mempool_calls = 0;
2533
2534 #if USE_LRTS_MEMPOOL
2535 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2536 {
2537     void *pool;
2538     int ret;
2539
2540     int default_size =  expand_flag? _expand_mem : _mempool_size;
2541     if (*size < default_size) *size = default_size;
2542     total_mempool_size += *size;
2543     total_mempool_calls += 1;
2544     ret = posix_memalign(&pool, ALIGNBUF, *size);
2545     if (ret != 0) {
2546 #if CMK_SMP && STEAL_MEMPOOL
2547       pool = steal_mempool_block(size, mem_hndl);
2548       if (pool != NULL) return pool;
2549 #endif
2550       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2551       if (ret == ENOMEM)
2552         CmiAbort("alloc_mempool_block: out of memory.");
2553       else
2554         CmiAbort("alloc_mempool_block: posix_memalign failed");
2555     }
2556         SetMemHndlZero((*mem_hndl));
2557     
2558     return pool;
2559 }
2560
2561 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2562 {
2563     if(!(IsMemHndlZero(mem_hndl)))
2564     {
2565         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2566         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2567     }
2568     free(ptr);
2569 }
2570 #endif
2571
2572 void LrtsPreCommonInit(int everReturn){
2573 #if USE_LRTS_MEMPOOL
2574     CpvInitialize(mempool_type*, mempool);
2575     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2576     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2577 #endif
2578 }
2579
2580
2581 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2582 {
2583     register int            i;
2584     int                     rc;
2585     int                     device_id = 0;
2586     unsigned int            remote_addr;
2587     gni_cdm_handle_t        cdm_hndl;
2588     gni_return_t            status = GNI_RC_SUCCESS;
2589     uint32_t                vmdh_index = -1;
2590     uint8_t                 ptag;
2591     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2592     int                     first_spawned;
2593     int                     physicalID;
2594     char                   *env;
2595
2596     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2597     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2598     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2599    
2600     status = PMI_Init(&first_spawned);
2601     GNI_RC_CHECK("PMI_Init", status);
2602
2603     status = PMI_Get_size(&mysize);
2604     GNI_RC_CHECK("PMI_Getsize", status);
2605
2606     status = PMI_Get_rank(&myrank);
2607     GNI_RC_CHECK("PMI_getrank", status);
2608
2609     //physicalID = CmiPhysicalNodeID(myrank);
2610     
2611     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2612
2613     *myNodeID = myrank;
2614     *numNodes = mysize;
2615   
2616 #if !COMM_THREAD_SEND
2617     /* Currently, we only consider the case that comm. thread will only recv msgs */
2618     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2619 #endif
2620
2621     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2622     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2623     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2624
2625     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2626     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2627     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2628
2629     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2630     if (env) useDynamicSMSG = 1;
2631     if (!useDynamicSMSG)
2632       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2633     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2634     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2635     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2636     
2637     if(myrank == 0)
2638     {
2639         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2640         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2641     }
2642 #ifdef USE_ONESIDED
2643     onesided_init(NULL, &onesided_hnd);
2644
2645     // this is a GNI test, so use the libonesided bypass functionality
2646     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2647     local_addr = gniGetNicAddress();
2648 #else
2649     ptag = get_ptag();
2650     cookie = get_cookie();
2651 #if 0
2652     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2653 #endif
2654     //Create and attach to the communication  domain */
2655     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2656     GNI_RC_CHECK("GNI_CdmCreate", status);
2657     //* device id The device id is the minor number for the device
2658     //that is assigned to the device by the system when the device is created.
2659     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2660     //where X is the device number 0 default 
2661     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2662     GNI_RC_CHECK("GNI_CdmAttach", status);
2663     local_addr = get_gni_nic_address(0);
2664 #endif
2665     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2666     _MEMCHECK(MPID_UGNI_AllAddr);
2667     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2668     /* create the local completion queue */
2669     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2670     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2671     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2672     
2673     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2674     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2675     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2676
2677     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2678     GNI_RC_CHECK("Create CQ (rx)", status);
2679     
2680     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2681     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2682     
2683     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2684     //GNI_RC_CHECK("Create BTE CQ", status);
2685
2686     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2687     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2688     _MEMCHECK(ep_hndl_array);
2689 #if CMK_SMP && !COMM_THREAD_SEND
2690     tx_cq_lock  = CmiCreateLock();
2691     rx_cq_lock  = CmiCreateLock();
2692 #endif
2693     for (i=0; i<mysize; i++) {
2694         if(i == myrank) continue;
2695         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2696         GNI_RC_CHECK("GNI_EpCreate ", status);   
2697         remote_addr = MPID_UGNI_AllAddr[i];
2698         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2699         GNI_RC_CHECK("GNI_EpBind ", status);   
2700     }
2701
2702     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2703     _init_smsg();
2704     PMI_Barrier();
2705
2706 #if     USE_LRTS_MEMPOOL
2707     env = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2708     if (env) _mempool_size = CmiReadSize(env);
2709     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&env,"Set the memory pool size")) 
2710         _mempool_size = CmiReadSize(env);
2711
2712     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2713
2714     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2715     if (env) {
2716         MAX_REG_MEM = CmiReadSize(env);
2717         if (myrank==0) printf("Charm++> memory pool maximum size: %1.fMB\n", MAX_REG_MEM/1024.0/1024);
2718     }
2719
2720     env = getenv("CHARM_UGNI_SEND_MAX");
2721     if (env) {
2722         MAX_BUFF_SEND = CmiReadSize(env);
2723         if (myrank==0) printf("Charm++> maximum pending memory pool for sending: %1.fMB\n", MAX_BUFF_SEND/1024.0/1024);
2724     }
2725
2726     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2727     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2728 #endif
2729
2730     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2731     if (env) _checkProgress = 0;
2732     if (mysize == 1) _checkProgress = 0;
2733
2734     /* init DMA buffer for medium message */
2735
2736     //_init_DMA_buffer();
2737     
2738     free(MPID_UGNI_AllAddr);
2739 #if CMK_SMP
2740     sendRdmaBuf = PCQueueCreate();
2741 #else
2742     sendRdmaBuf = 0;
2743 #endif
2744 #if MACHINE_DEBUG_LOG
2745     char ln[200];
2746     sprintf(ln,"debugLog.%d",myrank);
2747     debugLog=fopen(ln,"w");
2748 #endif
2749
2750 }
2751
2752 void* LrtsAlloc(int n_bytes, int header)
2753 {
2754     void *ptr;
2755 #if 0
2756     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2757 #endif
2758     if(n_bytes <= SMSG_MAX_MSG)
2759     {
2760         int totalsize = n_bytes+header;
2761         ptr = malloc(totalsize);
2762     }
2763     else {
2764         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2765 #if     USE_LRTS_MEMPOOL
2766         n_bytes = ALIGN64(n_bytes);
2767         if(n_bytes < BIG_MSG)
2768         {
2769             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2770             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2771         }else 
2772         {
2773             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2774             ptr = res + ALIGNBUF - header;
2775
2776         }
2777 #else
2778         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2779         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2780         ptr = res + ALIGNBUF - header;
2781 #endif
2782     }
2783     return ptr;
2784 }
2785
2786 void  LrtsFree(void *msg)
2787 {
2788     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2789     if (size <= SMSG_MAX_MSG)
2790       free(msg);
2791     else if(size>=BIG_MSG)
2792     {
2793         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2794
2795     }else
2796     {
2797 #if     USE_LRTS_MEMPOOL
2798 #if CMK_SMP
2799         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2800 #else
2801         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2802 #endif
2803 #else
2804         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2805 #endif
2806     }
2807 }
2808
2809 void LrtsExit()
2810 {
2811     /* free memory ? */
2812 #if USE_LRTS_MEMPOOL
2813     mempool_destroy(CpvAccess(mempool));
2814 #endif
2815     PMI_Finalize();
2816     exit(0);
2817 }
2818
2819 void LrtsDrainResources()
2820 {
2821     if(mysize == 1) return;
2822     while (
2823 #if CMK_USE_OOB
2824            !SendBufferMsg(&smsg_oob_queue) ||
2825 #endif
2826            !SendBufferMsg(&smsg_queue))
2827     {
2828         if (useDynamicSMSG)
2829             PumpDatagramConnection();
2830         PumpNetworkSmsg();
2831         PumpLocalRdmaTransactions();
2832         SendRdmaMsg();
2833     }
2834     PMI_Barrier();
2835 }
2836
2837 void LrtsAbort(const char *message) {
2838     printf("CmiAbort is calling on PE:%d\n", myrank);
2839     CmiPrintStackTrace(0);
2840     PMI_Abort(-1, message);
2841 }
2842
2843 /**************************  TIMER FUNCTIONS **************************/
2844 #if CMK_TIMER_USE_SPECIAL
2845 /* MPI calls are not threadsafe, even the timer on some machines */
2846 static CmiNodeLock  timerLock = 0;
2847 static int _absoluteTime = 0;
2848 static int _is_global = 0;
2849 static struct timespec start_ts;
2850
2851 inline int CmiTimerIsSynchronized() {
2852     return 0;
2853 }
2854
2855 inline int CmiTimerAbsolute() {
2856     return _absoluteTime;
2857 }
2858
2859 double CmiStartTimer() {
2860     return 0.0;
2861 }
2862
2863 double CmiInitTime() {
2864     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2865 }
2866
2867 void CmiTimerInit(char **argv) {
2868     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2869     if (_absoluteTime && CmiMyPe() == 0)
2870         printf("Charm++> absolute  timer is used\n");
2871     
2872     _is_global = CmiTimerIsSynchronized();
2873
2874
2875     if (_is_global) {
2876         if (CmiMyRank() == 0) {
2877             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2878         }
2879     } else { /* we don't have a synchronous timer, set our own start time */
2880         CmiBarrier();
2881         CmiBarrier();
2882         CmiBarrier();
2883         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2884     }
2885     CmiNodeAllBarrier();          /* for smp */
2886 }
2887
2888 /**
2889  * Since the timerLock is never created, and is
2890  * always NULL, then all the if-condition inside
2891  * the timer functions could be disabled right
2892  * now in the case of SMP.
2893  */
2894 double CmiTimer(void) {
2895     struct timespec now_ts;
2896     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2897     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2898         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2899 }
2900
2901 double CmiWallTimer(void) {
2902     struct timespec now_ts;
2903     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2904     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2905         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2906 }
2907
2908 double CmiCpuTimer(void) {
2909     struct timespec now_ts;
2910     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2911     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2912         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2913 }
2914
2915 #endif
2916 /************Barrier Related Functions****************/
2917
2918 int CmiBarrier()
2919 {
2920     gni_return_t status;
2921
2922 #if CMK_SMP
2923     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2924     CmiNodeAllBarrier();
2925     if (CmiMyRank() == CmiMyNodeSize())
2926 #else
2927     if (CmiMyRank() == 0)
2928 #endif
2929     {
2930         /**
2931          *  The call of CmiBarrier is usually before the initialization
2932          *  of trace module of Charm++, therefore, the START_EVENT
2933          *  and END_EVENT are disabled here. -Chao Mei
2934          */
2935         /*START_EVENT();*/
2936         status = PMI_Barrier();
2937         GNI_RC_CHECK("PMI_Barrier", status);
2938         /*END_EVENT(10);*/
2939     }
2940     CmiNodeAllBarrier();
2941     return status;
2942
2943 }
2944 #if CMK_DIRECT
2945 #include "machine-cmidirect.c"
2946 #endif
2947 #if CMK_PERSISTENT_COMM
2948 #include "machine-persistent.c"
2949 #endif
2950
2951