c5ba18fded8d46696562adb8827559205938a5d2
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes     # disable checking deadlock
21     export CHARM_UGNI_BIG_MSG_SIZE=4M           # set big message size protocol
22     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4    # set big message pipe len
23  */
24 /*@{*/
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <stdint.h>
29 #include <errno.h>
30 #include <malloc.h>
31 #include <unistd.h>
32
33 #include <gni_pub.h>
34 #include <pmi.h>
35 #include <time.h>
36
37 #include "converse.h"
38
39 #define PRINT_SYH  0
40
41 // Trace communication thread
42 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
43 #define TRACE_THRESHOLD     0.00005
44 #define CMI_MPI_TRACE_MOREDETAILED 0
45 #undef CMI_MPI_TRACE_USEREVENTS
46 #define CMI_MPI_TRACE_USEREVENTS 1
47 #else
48 #undef CMK_SMP_TRACE_COMMTHREAD
49 #define CMK_SMP_TRACE_COMMTHREAD 0
50 #endif
51
52 #define CMK_TRACE_COMMOVERHEAD 0
53 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
54 #undef CMI_MPI_TRACE_USEREVENTS
55 #define CMI_MPI_TRACE_USEREVENTS 1
56 #else
57 #undef CMK_TRACE_COMMOVERHEAD
58 #define CMK_TRACE_COMMOVERHEAD 0
59 #endif
60
61 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
62 CpvStaticDeclare(double, projTraceStart);
63 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
64 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
65 #else
66 #define  START_EVENT()
67 #define  END_EVENT(x)
68 #endif
69
70 #define CMI_EXERT_SEND_CAP      0
71 #define CMI_EXERT_RECV_CAP      0
72
73 #if CMI_EXERT_SEND_CAP
74 #define SEND_CAP 16
75 #endif
76
77 #if CMI_EXERT_RECV_CAP
78 #define RECV_CAP 2
79 #endif
80
81 #define REMOTE_EVENT                      0
82 #define USE_LRTS_MEMPOOL                  1
83
84 #if USE_LRTS_MEMPOOL
85
86 #define oneMB (1024ll*1024)
87 static CmiInt8 _mempool_size = 8*oneMB;
88 static CmiInt8 _expand_mem =  4*oneMB;
89
90 #endif
91
92 #if CMK_SMP && COMM_THREAD_SEND 
93 //Dynamic flow control about memory registration
94 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
95 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
96 #else
97 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
98 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
99 #endif
100
101 static CmiInt8 buffered_send_msg = 0;
102 static int register_memory_size = 0;
103
104 static int BIG_MSG  =  4*oneMB;
105 static int ONE_SEG  =  2*oneMB;
106 static int BIG_MSG_PIPELINE = 4;
107
108 #if CMK_SMP
109 #define COMM_THREAD_SEND 1
110 //#define MULTI_THREAD_SEND 1
111 #endif
112
113 #if CMK_SMP && MULTI_THREAD_SEND
114 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
115 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
116 #else
117 #define     CMI_GNI_LOCK
118 #define     CMI_GNI_UNLOCK
119 #endif
120
121 static int _checkProgress = 1;             /* check deadlock */
122 static int _detected_hang = 0;
123
124 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
125
126 // dynamic SMSG
127 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
128
129 static int avg_smsg_connection = 32;
130 static int                 *smsg_connected_flag= 0;
131 static gni_smsg_attr_t     **smsg_attr_vector_local;
132 static gni_smsg_attr_t     **smsg_attr_vector_remote;
133 static gni_ep_handle_t     ep_hndl_unbound;
134 static gni_smsg_attr_t     send_smsg_attr;
135 static gni_smsg_attr_t     recv_smsg_attr;
136
137 typedef struct _dynamic_smsg_mailbox{
138    void     *mailbox_base;
139    int      size;
140    int      offset;
141    gni_mem_handle_t  mem_hndl;
142    struct      _dynamic_smsg_mailbox  *next;
143 }dynamic_smsg_mailbox_t;
144
145 static dynamic_smsg_mailbox_t  *mailbox_list;
146
147 int         rdma_id = 0;
148
149 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
150 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
151
152 #if PRINT_SYH
153 int         lrts_send_msg_id = 0;
154 int         lrts_local_done_msg = 0;
155 int         lrts_send_rdma_success = 0;
156 #endif
157
158 #include "machine.h"
159
160 #include "pcqueue.h"
161
162 #include "mempool.h"
163
164 #if CMK_PERSISTENT_COMM
165 #include "machine-persistent.h"
166 #endif
167
168 //#define  USE_ONESIDED 1
169 #ifdef USE_ONESIDED
170 //onesided implementation is wrong, since no place to restore omdh
171 #include "onesided.h"
172 onesided_hnd_t   onesided_hnd;
173 onesided_md_t    omdh;
174 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
175
176 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
177
178 #else
179 uint8_t   onesided_hnd, omdh;
180 #if REMOTE_EVENT
181 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
182          status = GNI_RC_ERROR_NOMEM;} \
183         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
184                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
185 #else
186 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
187     do {   \
188         if (register_memory_size + size >= MAX_REG_MEM) { \
189             status = GNI_RC_ERROR_NOMEM; \
190         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
191             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
192     } while(0)
193 #endif
194 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
195     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
196              register_memory_size -= size; \
197          else CmiAbort("MEM_DEregister");  \
198     } while (0)
199 #endif
200
201 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
202 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
203 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
204                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
205 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
206                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
207 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
208 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
209 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
210 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
211 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
212 #define   GetSizeFromHeader(x) ((block_header*)x)->size
213 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
214 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
215 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
216 #define   IsMemHndlZero(x)  ((x).qword1 == 0 && (x).qword2 == 0)
217 #define   SetMemHndlZero(x)  do { (x).qword1 = 0; (x).qword2 = 0; } while (0)
218 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
219
220 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
221 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
222 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
223 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
224
225 #define ALIGNBUF                64
226
227 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
228 /* If SMSG is not used */
229
230 #define FMA_PER_CORE  1024
231 #define FMA_BUFFER_SIZE 1024
232
233 /* If SMSG is used */
234 static int  SMSG_MAX_MSG = 1024;
235 #define SMSG_MAX_CREDIT 72 
236
237 #define MSGQ_MAXSIZE       2048
238 /* large message transfer with FMA or BTE */
239 #define LRTS_GNI_RDMA_THRESHOLD  1024 
240
241 #if CMK_SMP
242 static int  REMOTE_QUEUE_ENTRIES=163840; 
243 static int LOCAL_QUEUE_ENTRIES=163840; 
244 #else
245 static int  REMOTE_QUEUE_ENTRIES=20480;
246 static int LOCAL_QUEUE_ENTRIES=20480; 
247 #endif
248
249 #define BIG_MSG_TAG  0x26
250 #define PUT_DONE_TAG      0x28
251 #define DIRECT_PUT_DONE_TAG      0x29
252 #define ACK_TAG           0x30
253 /* SMSG is data message */
254 #define SMALL_DATA_TAG          0x31
255 /* SMSG is a control message to initialize a BTE */
256 #define MEDIUM_HEAD_TAG         0x32
257 #define MEDIUM_DATA_TAG         0x33
258 #define LMSG_INIT_TAG           0x39 
259 #define VERY_LMSG_INIT_TAG      0x40 
260 #define VERY_LMSG_TAG           0x41 
261
262 #define DEBUG
263 #ifdef GNI_RC_CHECK
264 #undef GNI_RC_CHECK
265 #endif
266 #ifdef DEBUG
267 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
268 #else
269 #define GNI_RC_CHECK(msg,rc)
270 #endif
271
272 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
273 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
274
275 static int useStaticMSGQ = 0;
276 static int useStaticFMA = 0;
277 static int mysize, myrank;
278 gni_nic_handle_t      nic_hndl;
279
280 typedef struct {
281     gni_mem_handle_t mdh;
282     uint64_t addr;
283 } mdh_addr_t ;
284 // this is related to dynamic SMSG
285
286 typedef struct mdh_addr_list{
287     gni_mem_handle_t mdh;
288    void *addr;
289     struct mdh_addr_list *next;
290 }mdh_addr_list_t;
291
292 static unsigned int         smsg_memlen;
293 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
294 mdh_addr_t          setup_mem;
295 mdh_addr_t          *smsg_connection_vec = 0;
296 gni_mem_handle_t    smsg_connection_memhndl;
297 static int          smsg_expand_slots = 10;
298 static int          smsg_available_slot = 0;
299 static void         *smsg_mailbox_mempool = 0;
300 mdh_addr_list_t     *smsg_dynamic_list = 0;
301
302 static void             *smsg_mailbox_base;
303 gni_msgq_attr_t         msgq_attrs;
304 gni_msgq_handle_t       msgq_handle;
305 gni_msgq_ep_attr_t      msgq_ep_attrs;
306 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
307
308 /* =====Beginning of Declarations of Machine Specific Variables===== */
309 static int cookie;
310 static int modes = 0;
311 static gni_cq_handle_t       smsg_rx_cqh = NULL;
312 static gni_cq_handle_t       smsg_tx_cqh = NULL;
313 static gni_cq_handle_t       post_rx_cqh = NULL;
314 static gni_cq_handle_t       post_tx_cqh = NULL;
315 static gni_ep_handle_t       *ep_hndl_array;
316 #if CMK_SMP && MULTI_THREAD_SEND
317 static CmiNodeLock           *ep_lock_array;
318 static CmiNodeLock           tx_cq_lock; 
319 static CmiNodeLock           rx_cq_lock;
320 static CmiNodeLock           *mempool_lock;
321 #endif
322 typedef struct msg_list
323 {
324     uint32_t destNode;
325     uint32_t size;
326     void *msg;
327     uint8_t tag;
328 #if !CMK_SMP
329     struct msg_list *next;
330 #endif
331 }MSG_LIST;
332
333
334 typedef struct control_msg
335 {
336     uint64_t            source_addr;    /* address from the start of buffer  */
337     uint64_t            dest_addr;      /* address from the start of buffer */
338     //int                 source;         /* source rank */
339     int                 total_length;   /* total length */
340     int                 length;         /* length of this packet */
341     uint8_t             seq_id;         //big message   0 meaning single message
342     gni_mem_handle_t    source_mem_hndl;
343     struct control_msg *next;
344 }CONTROL_MSG;
345
346 #ifdef CMK_DIRECT
347 typedef struct{
348     void *recverBuf;
349     void (*callbackFnPtr)(void *);
350     void *callbackData;
351 }CMK_DIRECT_HEADER;
352 #endif
353 typedef struct  rmda_msg
354 {
355     int                   destNode;
356     gni_post_descriptor_t *pd;
357 #if !CMK_SMP
358     struct  rmda_msg      *next;
359 #endif
360 }RDMA_REQUEST;
361
362 #if CMK_SMP
363 PCQueue sendRdmaBuf;
364 typedef struct  msg_list_index
365 {
366     int         next;
367     PCQueue     sendSmsgBuf;
368 } MSG_LIST_INDEX;
369 #else
370 static RDMA_REQUEST        *sendRdmaBuf = 0;
371 static RDMA_REQUEST        *sendRdmaTail = 0;
372 typedef struct  msg_list_index
373 {
374     int         next;
375     MSG_LIST    *sendSmsgBuf;
376     MSG_LIST    *tail;
377 } MSG_LIST_INDEX;
378 #endif
379
380 /* reuse PendingMsg memory */
381 static CONTROL_MSG          *control_freelist=0;
382 static MSG_LIST             *msglist_freelist=0;
383
384 typedef struct smsg_queue
385 {
386     MSG_LIST_INDEX   *smsg_msglist_index;
387     int               smsg_head_index;
388 } SMSG_QUEUE;
389
390 SMSG_QUEUE                  smsg_queue;
391 SMSG_QUEUE                  smsg_oob_queue;
392
393 #if CMK_SMP
394
395 #define FreeMsgList(d)   free(d);
396 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
397
398 #else
399
400 #define FreeMsgList(d)  \
401   (d)->next = msglist_freelist;\
402   msglist_freelist = d;
403
404 #define MallocMsgList(d) \
405   d = msglist_freelist;\
406   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
407              _MEMCHECK(d);\
408   } else msglist_freelist = d->next; \
409   d->next =0;
410 #endif
411
412 #if CMK_SMP
413
414 #define FreeControlMsg(d)      free(d);
415 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
416
417 #else
418
419 #define FreeControlMsg(d)       \
420   (d)->next = control_freelist;\
421   control_freelist = d;
422
423 #define MallocControlMsg(d) \
424   d = control_freelist;\
425   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
426              _MEMCHECK(d);\
427   } else control_freelist = d->next;
428
429 #endif
430
431 static RDMA_REQUEST         *rdma_freelist = NULL;
432
433 #define FreeMediumControlMsg(d)       \
434   (d)->next = medium_control_freelist;\
435   medium_control_freelist = d;
436
437
438 #define MallocMediumControlMsg(d) \
439     d = medium_control_freelist;\
440     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
441     _MEMCHECK(d);\
442 } else mediumcontrol_freelist = d->next;
443
444 # if CMK_SMP
445 #define FreeRdmaRequest(d)       free(d);
446 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
447 #else
448
449 #define FreeRdmaRequest(d)       \
450   (d)->next = rdma_freelist;\
451   rdma_freelist = d;
452
453 #define MallocRdmaRequest(d) \
454   d = rdma_freelist;\
455   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
456              _MEMCHECK(d);\
457   } else rdma_freelist = d->next; \
458     d->next =0;
459 #endif
460
461 /* reuse gni_post_descriptor_t */
462 static gni_post_descriptor_t *post_freelist=0;
463
464 #if  !CMK_SMP
465 #define FreePostDesc(d)       \
466     (d)->next_descr = post_freelist;\
467     post_freelist = d;
468
469 #define MallocPostDesc(d) \
470   d = post_freelist;\
471   if (d==0) { \
472      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
473      _MEMCHECK(d);\
474   } else post_freelist = d->next_descr;
475 #else
476
477 #define FreePostDesc(d)     free(d);
478 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
479
480 #endif
481
482
483 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
484 static int      buffered_smsg_counter = 0;
485
486 /* SmsgSend return success but message sent is not confirmed by remote side */
487 static MSG_LIST *buffered_fma_head = 0;
488 static MSG_LIST *buffered_fma_tail = 0;
489
490 /* functions  */
491 #define IsFree(a,ind)  !( a& (1<<(ind) ))
492 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
493 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
494
495 CpvDeclare(mempool_type*, mempool);
496
497 /* get the upper bound of log 2 */
498 int mylog2(int size)
499 {
500     int op = size;
501     unsigned int ret=0;
502     unsigned int mask = 0;
503     int i;
504     while(op>0)
505     {
506         op = op >> 1;
507         ret++;
508
509     }
510     for(i=1; i<ret; i++)
511     {
512         mask = mask << 1;
513         mask +=1;
514     }
515
516     ret -= ((size &mask) ? 0:1);
517     return ret;
518 }
519
520 static void
521 allgather(void *in,void *out, int len)
522 {
523     static int *ivec_ptr=NULL,already_called=0,job_size=0;
524     int i,rc;
525     int my_rank;
526     char *tmp_buf,*out_ptr;
527
528     if(!already_called) {
529
530         rc = PMI_Get_size(&job_size);
531         CmiAssert(rc == PMI_SUCCESS);
532         rc = PMI_Get_rank(&my_rank);
533         CmiAssert(rc == PMI_SUCCESS);
534
535         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
536         CmiAssert(ivec_ptr != NULL);
537
538         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
539         CmiAssert(rc == PMI_SUCCESS);
540
541         already_called = 1;
542
543     }
544
545     tmp_buf = (char *)malloc(job_size * len);
546     CmiAssert(tmp_buf);
547
548     rc = PMI_Allgather(in,tmp_buf,len);
549     CmiAssert(rc == PMI_SUCCESS);
550
551     out_ptr = out;
552
553     for(i=0;i<job_size;i++) {
554
555         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
556
557     }
558
559     free(tmp_buf);
560 }
561
562 static void
563 allgather_2(void *in,void *out, int len)
564 {
565     //PMI_Allgather is out of order
566     int i,rc, extend_len;
567     int  rank_index;
568     char *out_ptr, *out_ref;
569     char *in2;
570
571     extend_len = sizeof(int) + len;
572     in2 = (char*)malloc(extend_len);
573
574     memcpy(in2, &myrank, sizeof(int));
575     memcpy(in2+sizeof(int), in, len);
576
577     out_ptr = (char*)malloc(mysize*extend_len);
578
579     rc = PMI_Allgather(in2, out_ptr, extend_len);
580     GNI_RC_CHECK("allgather", rc);
581
582     out_ref = out;
583
584     for(i=0;i<mysize;i++) {
585         //rank index 
586         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
587         //copy to the rank index slot
588         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
589     }
590
591     free(out_ptr);
592     free(in2);
593
594 }
595
596 static unsigned int get_gni_nic_address(int device_id)
597 {
598     unsigned int address, cpu_id;
599     gni_return_t status;
600     int i, alps_dev_id=-1,alps_address=-1;
601     char *token, *p_ptr;
602
603     p_ptr = getenv("PMI_GNI_DEV_ID");
604     if (!p_ptr) {
605         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
606        
607         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
608     } else {
609         while ((token = strtok(p_ptr,":")) != NULL) {
610             alps_dev_id = atoi(token);
611             if (alps_dev_id == device_id) {
612                 break;
613             }
614             p_ptr = NULL;
615         }
616         CmiAssert(alps_dev_id != -1);
617         p_ptr = getenv("PMI_GNI_LOC_ADDR");
618         CmiAssert(p_ptr != NULL);
619         i = 0;
620         while ((token = strtok(p_ptr,":")) != NULL) {
621             if (i == alps_dev_id) {
622                 alps_address = atoi(token);
623                 break;
624             }
625             p_ptr = NULL;
626             ++i;
627         }
628         CmiAssert(alps_address != -1);
629         address = alps_address;
630     }
631     return address;
632 }
633
634 static uint8_t get_ptag(void)
635 {
636     char *p_ptr, *token;
637     uint8_t ptag;
638
639     p_ptr = getenv("PMI_GNI_PTAG");
640     CmiAssert(p_ptr != NULL);
641     token = strtok(p_ptr, ":");
642     ptag = (uint8_t)atoi(token);
643     return ptag;
644         
645 }
646
647 static uint32_t get_cookie(void)
648 {
649     uint32_t cookie;
650     char *p_ptr, *token;
651
652     p_ptr = getenv("PMI_GNI_COOKIE");
653     CmiAssert(p_ptr != NULL);
654     token = strtok(p_ptr, ":");
655     cookie = (uint32_t)atoi(token);
656
657     return cookie;
658 }
659
660 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
661 /* TODO: add any that are related */
662 /* =====End of Definitions of Message-Corruption Related Macros=====*/
663
664
665 #include "machine-lrts.h"
666 #include "machine-common-core.c"
667
668 /* Network progress function is used to poll the network when for
669    messages. This flushes receive buffers on some  implementations*/
670 #if CMK_MACHINE_PROGRESS_DEFINED
671 void CmiMachineProgressImpl() {
672 }
673 #endif
674
675 static void SendRdmaMsg();
676 static void PumpNetworkSmsg();
677 static void PumpLocalRdmaTransactions();
678 static int SendBufferMsg(SMSG_QUEUE *queue);
679
680 #if MACHINE_DEBUG_LOG
681 FILE *debugLog = NULL;
682 static CmiInt8 buffered_recv_msg = 0;
683 int         lrts_smsg_success = 0;
684 int         lrts_received_msg = 0;
685 #endif
686
687 static void sweep_mempool(mempool_type *mptr)
688 {
689     block_header *current = &(mptr->block_head);
690
691     printf("[n %d] sweep_mempool slot START.\n", myrank);
692     while( current!= NULL) {
693         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
694         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
695     }
696     printf("[n %d] sweep_mempool slot END.\n", myrank);
697 }
698
699 inline
700 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
701 {
702     block_header *current = *from;
703
704     //while(register_memory_size>= MAX_REG_MEM)
705     //{
706         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
707             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
708
709         *from = current;
710         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
711         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
712         SetMemHndlZero(GetMemHndlFromHeader(current));
713     //}
714     return GNI_RC_SUCCESS;
715 }
716
717 inline 
718 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
719 {
720     gni_return_t status = GNI_RC_SUCCESS;
721     //int size = GetMempoolsize(msg);
722     //void *blockaddr = GetMempoolBlockPtr(msg);
723     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
724    
725     block_header *current = &(mptr->block_head);
726     while(register_memory_size>= MAX_REG_MEM)
727     {
728         status = deregisterMemory(mptr, &current);
729         if (status != GNI_RC_SUCCESS) break;
730     }
731     if(register_memory_size>= MAX_REG_MEM) return status;
732
733     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
734     while(1)
735     {
736         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
737         if(status == GNI_RC_SUCCESS)
738         {
739             break;
740         }
741         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
742         {
743             CmiAbort("Memory registor for mempool fails\n");
744         }
745         else
746         {
747             status = deregisterMemory(mptr, &current);
748             if (status != GNI_RC_SUCCESS) break;
749         }
750     }; 
751     return status;
752 }
753
754 inline 
755 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
756 {
757     static int rank = -1;
758     int i;
759     gni_return_t status;
760     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
761     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
762     mempool_type *mptr;
763
764     status = registerFromMempool(mptr1, msg, size, t);
765     if (status == GNI_RC_SUCCESS) return status;
766 #if CMK_SMP 
767     for (i=0; i<CmiMyNodeSize()+1; i++) {
768       rank = (rank+1)%(CmiMyNodeSize()+1);
769       mptr = CpvAccessOther(mempool, rank);
770       if (mptr == mptr1) continue;
771       status = registerFromMempool(mptr, msg, size, t);
772       if (status == GNI_RC_SUCCESS) return status;
773     }
774 #endif
775     return  GNI_RC_ERROR_RESOURCE;
776 }
777
778 inline
779 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
780 {
781     MSG_LIST        *msg_tmp;
782     MallocMsgList(msg_tmp);
783     msg_tmp->destNode = destNode;
784     msg_tmp->size   = size;
785     msg_tmp->msg    = msg;
786     msg_tmp->tag    = tag;
787
788 #if !CMK_SMP
789     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
790         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
791         queue->smsg_head_index = destNode;
792         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
793     }else
794     {
795         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
796         queue->smsg_msglist_index[destNode].tail = msg_tmp;
797     }
798 #else
799     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
800 #endif
801 #if PRINT_SYH
802     buffered_smsg_counter++;
803 #endif
804 }
805
806 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
807 {
808     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
809 }
810
811 inline
812 static void setup_smsg_connection(int destNode)
813 {
814     mdh_addr_list_t  *new_entry = 0;
815     gni_post_descriptor_t *pd;
816     gni_smsg_attr_t      *smsg_attr;
817     gni_return_t status = GNI_RC_NOT_DONE;
818     RDMA_REQUEST        *rdma_request_msg;
819     
820     if(smsg_available_slot == smsg_expand_slots)
821     {
822         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
823         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
824         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
825
826         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
827             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
828             GNI_MEM_READWRITE,   
829             -1,
830             &(new_entry->mdh));
831         smsg_available_slot = 0; 
832         new_entry->next = smsg_dynamic_list;
833         smsg_dynamic_list = new_entry;
834     }
835     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
836     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
837     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
838     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
839     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
840     smsg_attr->buff_size = smsg_memlen;
841     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
842     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
843     smsg_local_attr_vec[destNode] = smsg_attr;
844     smsg_available_slot++;
845     MallocPostDesc(pd);
846     pd->type            = GNI_POST_FMA_PUT;
847     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
848     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
849     pd->length          = sizeof(gni_smsg_attr_t);
850     pd->local_addr      = (uint64_t) smsg_attr;
851     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
852     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
853     pd->src_cq_hndl     = 0;
854     pd->rdma_mode       = 0;
855     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
856     print_smsg_attr(smsg_attr);
857     if(status == GNI_RC_ERROR_RESOURCE )
858     {
859         MallocRdmaRequest(rdma_request_msg);
860         rdma_request_msg->destNode = destNode;
861         rdma_request_msg->pd = pd;
862         /* buffer this request */
863     }
864 #if PRINT_SYH
865     if(status != GNI_RC_SUCCESS)
866        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
867     else
868         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
869 #endif
870 }
871
872 /* useDynamicSMSG */
873 inline 
874 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
875 {
876     gni_return_t status = GNI_RC_NOT_DONE;
877
878     if(mailbox_list->offset == mailbox_list->size)
879     {
880         dynamic_smsg_mailbox_t *new_mailbox_entry;
881         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
882         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
883         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
884         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
885         new_mailbox_entry->offset = 0;
886         
887         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
888             new_mailbox_entry->size, smsg_rx_cqh,
889             GNI_MEM_READWRITE,   
890             -1,
891             &(new_mailbox_entry->mem_hndl));
892
893         GNI_RC_CHECK("register", status);
894         new_mailbox_entry->next = mailbox_list;
895         mailbox_list = new_mailbox_entry;
896     }
897     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
898     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
899     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
900     local_smsg_attr->mbox_offset = mailbox_list->offset;
901     mailbox_list->offset += smsg_memlen;
902     local_smsg_attr->buff_size = smsg_memlen;
903     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
904     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
905 }
906
907 /* useDynamicSMSG */
908 inline 
909 static int connect_to(int destNode)
910 {
911     gni_return_t status = GNI_RC_NOT_DONE;
912     CmiAssert(smsg_connected_flag[destNode] == 0);
913     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
914     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
915     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
916     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
917     
918     CMI_GNI_LOCK
919     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
920     CMI_GNI_UNLOCK
921     if (status == GNI_RC_ERROR_RESOURCE) {
922       /* possibly destNode is making connection at the same time */
923       free(smsg_attr_vector_local[destNode]);
924       smsg_attr_vector_local[destNode] = NULL;
925       free(smsg_attr_vector_remote[destNode]);
926       smsg_attr_vector_remote[destNode] = NULL;
927       mailbox_list->offset -= smsg_memlen;
928       return 0;
929     }
930     GNI_RC_CHECK("GNI_Post", status);
931     smsg_connected_flag[destNode] = 1;
932     return 1;
933 }
934
935 inline 
936 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
937 {
938     unsigned int          remote_address;
939     uint32_t              remote_id;
940     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
941     gni_smsg_attr_t       *smsg_attr;
942     gni_post_descriptor_t *pd;
943     gni_post_state_t      post_state;
944     char                  *real_data; 
945
946     if (useDynamicSMSG) {
947         switch (smsg_connected_flag[destNode]) {
948         case 0: 
949             connect_to(destNode);         /* continue to case 1 */
950         case 1:                           /* pending connection, do nothing */
951             status = GNI_RC_NOT_DONE;
952             if(inbuff ==0)
953                 buffer_small_msgs(queue, msg, size, destNode, tag);
954             return status;
955         }
956     }
957 #if CMK_SMP
958     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
959     {
960 #else
961     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
962     {
963 #endif
964         CMI_GNI_LOCK
965         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
966         CMI_GNI_UNLOCK
967         if(status == GNI_RC_SUCCESS)
968         {
969 #if CMK_SMP_TRACE_COMMTHREAD
970             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
971             { 
972                 START_EVENT();
973                 if ( tag == SMALL_DATA_TAG)
974                     real_data = (char*)msg; 
975                 else 
976                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
977                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
978             }
979 #endif
980             smsg_send_count ++;
981             return status;
982         }else
983             status = GNI_RC_ERROR_RESOURCE;
984     }
985     if(inbuff ==0)
986         buffer_small_msgs(queue, msg, size, destNode, tag);
987     return status;
988 }
989
990
991 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
992 {
993     /* construct a control message and send */
994     CONTROL_MSG         *control_msg_tmp;
995     MallocControlMsg(control_msg_tmp);
996     control_msg_tmp->source_addr = (uint64_t)msg;
997     control_msg_tmp->seq_id    = seqno;
998     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
999 #if     USE_LRTS_MEMPOOL
1000     if(size < BIG_MSG)
1001     {
1002         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1003     }
1004     else
1005     {
1006         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1007         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1008         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1009     }
1010 #else
1011     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1012 #endif
1013     return control_msg_tmp;
1014 }
1015
1016 // Large message, send control to receiver, receiver register memory and do a GET, 
1017 // return 1 - send no success
1018 inline
1019 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1020 {
1021     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1022     uint32_t            vmdh_index  = -1;
1023     int                 size;
1024     int                 offset = 0;
1025     uint64_t            source_addr;
1026     int                 register_size; 
1027     void                *msg;
1028
1029     size    =   control_msg_tmp->total_length;
1030     source_addr = control_msg_tmp->source_addr;
1031     register_size = control_msg_tmp->length;
1032
1033 #if     USE_LRTS_MEMPOOL
1034     if( control_msg_tmp->seq_id == 0 ){
1035         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1036         {
1037             msg = (void*)source_addr;
1038             //register the corresponding mempool
1039             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1040             {
1041                 if(!inbuff)
1042                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1043                 return GNI_RC_ERROR_NOMEM;
1044             }
1045             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1046             if(status == GNI_RC_SUCCESS)
1047             {
1048                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1049             }
1050         }else
1051         {
1052             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1053             status = GNI_RC_SUCCESS;
1054         }
1055         if(NoMsgInSend( control_msg_tmp->source_addr))
1056             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1057         else
1058             register_size = 0;
1059     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1060     {
1061         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1062         source_addr += offset;
1063         size = control_msg_tmp->length;
1064         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1065             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1066             {
1067                 if(!inbuff)
1068                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1069                 return GNI_RC_ERROR_NOMEM;
1070             }
1071             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1072             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1073         }
1074         else
1075         {
1076             status = GNI_RC_SUCCESS;
1077         }
1078         register_size = 0;  
1079     }
1080
1081     if(status == GNI_RC_SUCCESS)
1082     {
1083         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1084         if(status == GNI_RC_SUCCESS)
1085         {
1086             buffered_send_msg += register_size;
1087             if(control_msg_tmp->seq_id == 0)
1088             {
1089                 IncreaseMsgInSend(source_addr);
1090             }
1091             FreeControlMsg(control_msg_tmp);
1092             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1093         }else
1094             status = GNI_RC_ERROR_RESOURCE;
1095
1096     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1097     {
1098         CmiAbort("Memory registor for large msg\n");
1099     }else 
1100     {
1101         status = GNI_RC_ERROR_NOMEM; 
1102         if(!inbuff)
1103             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1104     }
1105     return status;
1106 #else
1107     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1108     if(status == GNI_RC_SUCCESS)
1109     {
1110         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1111         if(status == GNI_RC_SUCCESS)
1112         {
1113             FreeControlMsg(control_msg_tmp);
1114         }
1115     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1116     {
1117         CmiAbort("Memory registor for large msg\n");
1118     }else 
1119     {
1120         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1121     }
1122     return status;
1123 #endif
1124 }
1125
1126 inline void LrtsPrepareEnvelope(char *msg, int size)
1127 {
1128     CmiSetMsgSize(msg, size);
1129 }
1130
1131 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1132 {
1133     gni_return_t        status  =   GNI_RC_SUCCESS;
1134     uint8_t tag;
1135     CONTROL_MSG         *control_msg_tmp;
1136     int                 oob = ( mode & OUT_OF_BAND);
1137     SMSG_QUEUE          *queue;
1138
1139     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1140 #if CMK_USE_OOB
1141     queue = oob? &smsg_oob_queue : &smsg_queue;
1142 #else
1143     queue = &smsg_queue;
1144 #endif
1145
1146     LrtsPrepareEnvelope(msg, size);
1147
1148 #if PRINT_SYH
1149     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1150 #endif 
1151 #if CMK_SMP && COMM_THREAD_SEND
1152     if(size <= SMSG_MAX_MSG)
1153         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1154     else if (size < BIG_MSG) {
1155         control_msg_tmp =  construct_control_msg(size, msg, 0);
1156         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1157     }
1158     else {
1159           CmiSetMsgSeq(msg, 0);
1160           control_msg_tmp =  construct_control_msg(size, msg, 1);
1161           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1162     }
1163 #else   //non-smp, smp(worker sending)
1164     if(size <= SMSG_MAX_MSG)
1165     {
1166         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1167             CmiFree(msg);
1168     }
1169     else if (size < BIG_MSG) {
1170         control_msg_tmp =  construct_control_msg(size, msg, 0);
1171         send_large_messages(queue, destNode, control_msg_tmp, 0);
1172     }
1173     else {
1174 #if     USE_LRTS_MEMPOOL
1175         CmiSetMsgSeq(msg, 0);
1176         control_msg_tmp =  construct_control_msg(size, msg, 1);
1177         send_large_messages(queue, destNode, control_msg_tmp, 0);
1178 #else
1179         control_msg_tmp =  construct_control_msg(size, msg, 0);
1180         send_large_messages(queue, destNode, control_msg_tmp, 0);
1181 #endif
1182     }
1183 #endif
1184     return 0;
1185 }
1186
1187 static void    PumpDatagramConnection();
1188 static void registerUserTraceEvents() {
1189 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1190     traceRegisterUserEvent("setting up connections", 10);
1191     traceRegisterUserEvent("Receiving small msgs", 20);
1192     traceRegisterUserEvent("Release local transaction", 30);
1193     traceRegisterUserEvent("Sending buffered small msgs", 40);
1194     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1195 #endif
1196 }
1197
1198 static void ProcessDeadlock()
1199 {
1200     static CmiUInt8 *ptr = NULL;
1201     static CmiUInt8  last = 0, mysum, sum;
1202     static int count = 0;
1203     gni_return_t status;
1204     int i;
1205
1206 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1207 //sweep_mempool(CpvAccess(mempool));
1208     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1209     mysum = smsg_send_count + smsg_recv_count;
1210     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1211     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1212     GNI_RC_CHECK("PMI_Allgather", status);
1213     sum = 0;
1214     for (i=0; i<mysize; i++)  sum+= ptr[i];
1215     if (last == 0 || sum == last) 
1216         count++;
1217     else
1218         count = 0;
1219     last = sum;
1220     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1221     if (count == 2) { 
1222         /* detected twice, it is a real deadlock */
1223         if (myrank == 0)  {
1224             CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage.\n");
1225             CmiAbort("Fatal> Deadlock detected.");
1226         }
1227     }
1228     _detected_hang = 0;
1229 }
1230
1231 static void CheckProgress()
1232 {
1233     if (smsg_send_count == last_smsg_send_count &&
1234         smsg_recv_count == last_smsg_recv_count ) 
1235     {
1236         _detected_hang = 1;
1237 #if !CMK_SMP
1238         if (_detected_hang) ProcessDeadlock();
1239 #endif
1240
1241     }
1242     else {
1243         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1244         last_smsg_send_count = smsg_send_count;
1245         last_smsg_recv_count = smsg_recv_count;
1246         _detected_hang = 0;
1247     }
1248 }
1249
1250 static void set_limit()
1251 {
1252     if (CmiMyRank() == 0) {
1253         int mynode = CmiPhysicalNodeID(CmiMyPe());
1254         int numpes = CmiNumPesOnPhysicalNode(mynode);
1255         int numprocesses = numpes / CmiMyNodeSize();
1256         int totalmem = 1024*1024*1024;
1257         int mem_max = totalmem / numprocesses;
1258         int send_max = mem_max / 2;
1259     }
1260 }
1261
1262 void LrtsPostCommonInit(int everReturn)
1263 {
1264 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1265     CpvInitialize(double, projTraceStart);
1266     /* only PE 0 needs to care about registration (to generate sts file). */
1267     if (CmiMyPe() == 0) {
1268         registerMachineUserEventsFunction(&registerUserTraceEvents);
1269     }
1270 #endif
1271
1272 #if CMK_SMP
1273     CmiIdleState *s=CmiNotifyGetState();
1274     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1275     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1276 #else
1277     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1278     if (useDynamicSMSG)
1279     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1280 #endif
1281
1282     if (_checkProgress)
1283 #if CMK_SMP
1284     if (CmiMyRank() == 0)
1285 #endif
1286     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1287
1288     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1289 }
1290
1291 /* this is called by worker thread */
1292 void LrtsPostNonLocal(){
1293 #if CMK_SMP
1294 #if MULTI_THREAD_SEND
1295     if(mysize == 1) return;
1296     PumpLocalRdmaTransactions();
1297 #if CMK_USE_OOB
1298     if (SendBufferMsg(&smsg_oob_queue) == 1)
1299 #endif
1300     SendBufferMsg(&smsg_queue);
1301     SendRdmaMsg();
1302 #endif
1303 #endif
1304 }
1305
1306 /* useDynamicSMSG */
1307 static void    PumpDatagramConnection()
1308 {
1309     uint32_t          remote_address;
1310     uint32_t          remote_id;
1311     gni_return_t status;
1312     gni_post_state_t  post_state;
1313     uint64_t          datagram_id;
1314     int i;
1315
1316    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1317    {
1318        if (datagram_id >= mysize) {           /* bound endpoint */
1319            int pe = datagram_id - mysize;
1320            CMI_GNI_LOCK
1321            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1322            CMI_GNI_UNLOCK
1323            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1324            {
1325                CmiAssert(remote_id == pe);
1326                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1327                GNI_RC_CHECK("Dynamic SMSG Init", status);
1328 #if PRINT_SYH
1329                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1330 #endif
1331                CmiAssert(smsg_connected_flag[pe] == 1);
1332                smsg_connected_flag[pe] = 2;
1333            }
1334        }
1335        else {         /* unbound ep */
1336            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1337            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1338            {
1339                CmiAssert(remote_id<mysize);
1340                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1341                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1342                GNI_RC_CHECK("Dynamic SMSG Init", status);
1343 #if PRINT_SYH
1344                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1345 #endif
1346                smsg_connected_flag[remote_id] = 2;
1347
1348                alloc_smsg_attr(&send_smsg_attr);
1349                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1350                GNI_RC_CHECK("post unbound datagram", status);
1351            }
1352        }
1353    }
1354 }
1355
1356 /* pooling CQ to receive network message */
1357 static void PumpNetworkRdmaMsgs()
1358 {
1359     gni_cq_entry_t      event_data;
1360     gni_return_t        status;
1361
1362 }
1363
1364 inline 
1365 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1366 {
1367     RDMA_REQUEST        *rdma_request_msg;
1368     MallocRdmaRequest(rdma_request_msg);
1369     rdma_request_msg->destNode = inst_id;
1370     rdma_request_msg->pd = pd;
1371 #if CMK_SMP
1372     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1373 #else
1374     if(sendRdmaBuf == 0)
1375     {
1376         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1377     }else{
1378         sendRdmaTail->next = rdma_request_msg;
1379         sendRdmaTail =  rdma_request_msg;
1380     }
1381 #endif
1382
1383 }
1384 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1385 static void PumpNetworkSmsg()
1386 {
1387     uint64_t            inst_id;
1388     int                 ret;
1389     gni_cq_entry_t      event_data;
1390     gni_return_t        status, status2;
1391     void                *header;
1392     uint8_t             msg_tag;
1393     int                 msg_nbytes;
1394     void                *msg_data;
1395     gni_mem_handle_t    msg_mem_hndl;
1396     gni_smsg_attr_t     *smsg_attr;
1397     gni_smsg_attr_t     *remote_smsg_attr;
1398     int                 init_flag;
1399     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1400     uint64_t            source_addr;
1401     SMSG_QUEUE         *queue = &smsg_queue;
1402
1403     while(1)
1404     {
1405         CMI_GNI_LOCK
1406         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1407         CMI_GNI_UNLOCK
1408         if(status != GNI_RC_SUCCESS)
1409             break;
1410         inst_id = GNI_CQ_GET_INST_ID(event_data);
1411         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1412 #if PRINT_SYH
1413         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1414 #endif
1415         if (useDynamicSMSG) {
1416             /* subtle: smsg may come before connection is setup */
1417             while (smsg_connected_flag[inst_id] != 2) 
1418                PumpDatagramConnection();
1419         }
1420         msg_tag = GNI_SMSG_ANY_TAG;
1421         while(1) {
1422             CMI_GNI_LOCK
1423             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1424             CMI_GNI_UNLOCK
1425             if (status != GNI_RC_SUCCESS)
1426                 break;
1427 #if PRINT_SYH
1428             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1429 #endif
1430             /* copy msg out and then put into queue (small message) */
1431             switch (msg_tag) {
1432             case SMALL_DATA_TAG:
1433             {
1434                 START_EVENT();
1435                 msg_nbytes = CmiGetMsgSize(header);
1436                 msg_data    = CmiAlloc(msg_nbytes);
1437                 memcpy(msg_data, (char*)header, msg_nbytes);
1438 #if CMK_SMP_TRACE_COMMTHREAD
1439                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1440 #endif
1441                 handleOneRecvedMsg(msg_nbytes, msg_data);
1442                 break;
1443             }
1444             case LMSG_INIT_TAG:
1445             {
1446                 getLargeMsgRequest(header, inst_id);
1447                 break;
1448             }
1449             case ACK_TAG:   //msg fit into mempool
1450             {
1451                 /* Get is done, release message . Now put is not used yet*/
1452                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1453 #if ! USE_LRTS_MEMPOOL
1454                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1455 #else
1456                 DecreaseMsgInSend(msg);
1457 #endif
1458                 if(NoMsgInSend(msg))
1459                     buffered_send_msg -= GetMempoolsize(msg);
1460                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1461                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1462                 break;
1463             }
1464             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1465             {
1466                 header_tmp = (CONTROL_MSG *) header;
1467                 void *msg = (void*)(header_tmp->source_addr);
1468                 int cur_seq = CmiGetMsgSeq(msg);
1469                 int offset = ONE_SEG*(cur_seq+1);
1470                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1471                 buffered_send_msg -= header_tmp->length;
1472                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1473                 if (remain_size < 0) remain_size = 0;
1474                 CmiSetMsgSize(msg, remain_size);
1475                 if(remain_size <= 0) //transaction done
1476                 {
1477                     CmiFree(msg);
1478                 }else if (header_tmp->total_length > offset)
1479                 {
1480                     CmiSetMsgSeq(msg, cur_seq+1);
1481                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1482                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1483                     //send next seg
1484                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1485                          // pipelining
1486                     if (header_tmp->seq_id == 1) {
1487                       int i;
1488                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1489                         int seq = cur_seq+i+2;
1490                         CmiSetMsgSeq(msg, seq-1);
1491                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1492                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1493                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1494                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1495                       }
1496                     }
1497                 }
1498                 break;
1499             }
1500 #if CMK_PERSISTENT_COMM
1501             case PUT_DONE_TAG: //persistent message
1502                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1503                 int size = ((CONTROL_MSG *) header)->length;
1504                 CmiReference(msg);
1505                 handleOneRecvedMsg(size, msg); 
1506                 break;
1507 #endif
1508 #ifdef CMK_DIRECT
1509             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1510                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1511                 break;
1512 #endif
1513             default: {
1514                 printf("weird tag problem\n");
1515                 CmiAbort("Unknown tag\n");
1516                      }
1517             }               // end switch
1518 #if PRINT_SYH
1519             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1520 #endif
1521             CMI_GNI_LOCK
1522             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1523             CMI_GNI_UNLOCK
1524             smsg_recv_count ++;
1525             msg_tag = GNI_SMSG_ANY_TAG;
1526         } //endwhile getNext
1527     }   //end while GetEvent
1528     if(status == GNI_RC_ERROR_RESOURCE)
1529     {
1530         GNI_RC_CHECK("Smsg_rx_cq full", status);
1531     }
1532 }
1533
1534 static void printDesc(gni_post_descriptor_t *pd)
1535 {
1536     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1537 }
1538
1539 // for BIG_MSG called on receiver side for receiving control message
1540 // LMSG_INIT_TAG
1541 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1542 {
1543 #if     USE_LRTS_MEMPOOL
1544     CONTROL_MSG         *request_msg;
1545     gni_return_t        status = GNI_RC_SUCCESS;
1546     void                *msg_data;
1547     gni_post_descriptor_t *pd;
1548     gni_mem_handle_t    msg_mem_hndl;
1549     int source, size, transaction_size, offset = 0;
1550     int     register_size = 0;
1551
1552     // initial a get to transfer data from the sender side */
1553     request_msg = (CONTROL_MSG *) header;
1554     size = request_msg->total_length;
1555     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1556     if(request_msg->seq_id < 2)   {
1557         msg_data = CmiAlloc(size);
1558         CmiSetMsgSeq(msg_data, 0);
1559         _MEMCHECK(msg_data);
1560     }
1561     else {
1562         offset = ONE_SEG*(request_msg->seq_id-1);
1563         msg_data = (char*)request_msg->dest_addr + offset;
1564     }
1565    
1566     MallocPostDesc(pd);
1567     pd->cqwrite_value = request_msg->seq_id;
1568     if( request_msg->seq_id == 0)
1569     {
1570         pd->local_mem_hndl= GetMemHndl(msg_data);
1571         transaction_size = ALIGN64(size);
1572         if(IsMemHndlZero(pd->local_mem_hndl))
1573         {   
1574             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1575             if(status == GNI_RC_SUCCESS)
1576             {
1577                 pd->local_mem_hndl = GetMemHndl(msg_data);
1578             }
1579             else
1580             {
1581                 SetMemHndlZero(pd->local_mem_hndl);
1582             }
1583         }
1584         if(NoMsgInRecv( (void*)(msg_data)))
1585             register_size = GetMempoolsize((void*)(msg_data));
1586         else
1587             register_size = 0;
1588     }
1589     else{
1590         transaction_size = ALIGN64(request_msg->length);
1591         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1592         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1593         {
1594             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1595         }
1596     }
1597     pd->first_operand = ALIGN64(size);                   //  total length
1598
1599     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1600         pd->type            = GNI_POST_FMA_GET;
1601     else
1602         pd->type            = GNI_POST_RDMA_GET;
1603 #if REMOTE_EVENT
1604     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1605 #else
1606     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1607 #endif
1608     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1609     pd->length          = transaction_size;
1610     pd->local_addr      = (uint64_t) msg_data;
1611     pd->remote_addr     = request_msg->source_addr + offset;
1612     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1613     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1614     pd->rdma_mode       = 0;
1615     pd->amo_cmd         = 0;
1616
1617     //memory registration success
1618     if(status == GNI_RC_SUCCESS)
1619     {
1620         CMI_GNI_LOCK
1621         if(pd->type == GNI_POST_RDMA_GET) 
1622             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1623         else
1624             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1625         CMI_GNI_UNLOCK
1626
1627         if(status == GNI_RC_SUCCESS )
1628         {
1629             if(pd->cqwrite_value == 0)
1630             {
1631 #if MACHINE_DEBUG_LOG
1632                 buffered_recv_msg += register_size;
1633                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1634 #endif
1635                 IncreaseMsgInRecv(msg_data);
1636             }
1637         }
1638     }else
1639     {
1640         SetMemHndlZero((pd->local_mem_hndl));
1641     }
1642     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1643     {
1644         bufferRdmaMsg(inst_id, pd); 
1645     }else {
1646          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1647         GNI_RC_CHECK("GetLargeAFter posting", status);
1648     }
1649 #else
1650     CONTROL_MSG         *request_msg;
1651     gni_return_t        status;
1652     void                *msg_data;
1653     gni_post_descriptor_t *pd;
1654     RDMA_REQUEST        *rdma_request_msg;
1655     gni_mem_handle_t    msg_mem_hndl;
1656     //int source;
1657     // initial a get to transfer data from the sender side */
1658     request_msg = (CONTROL_MSG *) header;
1659     msg_data = CmiAlloc(request_msg->length);
1660     _MEMCHECK(msg_data);
1661
1662     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1663
1664     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1665     {
1666         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1667     }
1668
1669     MallocPostDesc(pd);
1670     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1671         pd->type            = GNI_POST_FMA_GET;
1672     else
1673         pd->type            = GNI_POST_RDMA_GET;
1674 #if REMOTE_EVENT
1675     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1676 #else
1677     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1678 #endif
1679     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1680     pd->length          = ALIGN64(request_msg->length);
1681     pd->local_addr      = (uint64_t) msg_data;
1682     pd->remote_addr     = request_msg->source_addr;
1683     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1684     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1685     pd->rdma_mode       = 0;
1686     pd->amo_cmd         = 0;
1687
1688     //memory registration successful
1689     if(status == GNI_RC_SUCCESS)
1690     {
1691         pd->local_mem_hndl  = msg_mem_hndl;
1692         CMI_GNI_LOCK
1693         if(pd->type == GNI_POST_RDMA_GET) 
1694             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1695         else
1696             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1697         CMI_GNI_UNLOCK
1698     }else
1699     {
1700         SetMemHndlZero(pd->local_mem_hndl);
1701     }
1702     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1703     {
1704         MallocRdmaRequest(rdma_request_msg);
1705         rdma_request_msg->next = 0;
1706         rdma_request_msg->destNode = inst_id;
1707         rdma_request_msg->pd = pd;
1708         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1709     }else {
1710         GNI_RC_CHECK("AFter posting", status);
1711     }
1712 #endif
1713 }
1714
1715 static void PumpLocalRdmaTransactions()
1716 {
1717     gni_cq_entry_t          ev;
1718     gni_return_t            status;
1719     uint64_t                type, inst_id;
1720     gni_post_descriptor_t   *tmp_pd;
1721     MSG_LIST                *ptr;
1722     CONTROL_MSG             *ack_msg_tmp;
1723     uint8_t                 msg_tag;
1724 #ifdef CMK_DIRECT
1725     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1726 #endif
1727     SMSG_QUEUE         *queue = &smsg_queue;
1728
1729     while(1) {
1730         CMI_GNI_LOCK 
1731         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1732         CMI_GNI_UNLOCK
1733         if(status != GNI_RC_SUCCESS) break;
1734         
1735         type = GNI_CQ_GET_TYPE(ev);
1736         if (type == GNI_CQ_EVENT_TYPE_POST)
1737         {
1738             inst_id     = GNI_CQ_GET_INST_ID(ev);
1739 #if PRINT_SYH
1740             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1741 #endif
1742             CMI_GNI_LOCK
1743             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1744             CMI_GNI_UNLOCK
1745 #ifdef CMK_DIRECT
1746             if(tmp_pd->amo_cmd == 1)
1747             {
1748                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1749                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1750                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1751                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1752             }
1753             else{
1754                 MallocControlMsg(ack_msg_tmp);
1755                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1756                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1757             }
1758 #else
1759             MallocControlMsg(ack_msg_tmp);
1760             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1761             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1762 #endif
1763             ////Message is sent, free message , put is not used now
1764             switch (tmp_pd->type) {
1765 #if CMK_PERSISTENT_COMM
1766             case GNI_POST_RDMA_PUT:
1767 #if ! USE_LRTS_MEMPOOL
1768                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1769 #endif
1770             case GNI_POST_FMA_PUT:
1771                 CmiFree((void *)tmp_pd->local_addr);
1772                 msg_tag = PUT_DONE_TAG;
1773                 break;
1774 #endif
1775 #ifdef CMK_DIRECT
1776             case GNI_POST_RDMA_PUT:
1777             case GNI_POST_FMA_PUT:
1778                 //sender ACK to receiver to trigger it is done
1779                 msg_tag = DIRECT_PUT_DONE_TAG;
1780                 break;
1781 #endif
1782             case GNI_POST_RDMA_GET:
1783             case GNI_POST_FMA_GET:
1784 #if  ! USE_LRTS_MEMPOOL
1785                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1786                 msg_tag = ACK_TAG;  
1787 #else
1788                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1789                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1790                 {
1791                     msg_tag = BIG_MSG_TAG; 
1792                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1793                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1794                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1795                 } 
1796                 else
1797                 {
1798                     msg_tag = ACK_TAG;  
1799                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1800                 }
1801                 ack_msg_tmp->length = tmp_pd->length;
1802                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1803 #endif
1804                 break;
1805             default:
1806                 CmiPrintf("type=%d\n", tmp_pd->type);
1807                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1808             }
1809 #if CMK_DIRECT
1810             if(tmp_pd->amo_cmd == 1)
1811                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1812             else
1813 #endif
1814                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1815             if(status == GNI_RC_SUCCESS)
1816             {
1817 #if CMK_DIRECT
1818                 if(tmp_pd->amo_cmd == 1)
1819                     free(cmk_direct_done_msg); 
1820                 else
1821 #endif
1822                     FreeControlMsg(ack_msg_tmp);
1823
1824             }
1825 #if CMK_PERSISTENT_COMM
1826             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1827 #endif
1828             {
1829                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1830 #if PRINT_SYH
1831                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1832 #endif
1833 #if CMK_SMP_TRACE_COMMTHREAD
1834                     START_EVENT();
1835 #endif
1836                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1837                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1838 #if MACHINE_DEBUG_LOG
1839                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1840                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1841                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1842 #endif
1843 #if CMK_SMP_TRACE_COMMTHREAD
1844                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1845 #endif
1846                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1847                 }else if(msg_tag == BIG_MSG_TAG){
1848                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1849                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1850                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1851 #if CMK_SMP_TRACE_COMMTHREAD
1852                         START_EVENT();
1853 #endif
1854 #if PRINT_SYH
1855                         printf("Pipeline msg done [%d]\n", myrank);
1856 #endif
1857 #if CMK_SMP_TRACE_COMMTHREAD
1858                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1859 #endif
1860                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1861                     }
1862                 }
1863             }
1864             FreePostDesc(tmp_pd);
1865         }
1866     } //end while
1867     if(status == GNI_RC_ERROR_RESOURCE)
1868     {
1869         GNI_RC_CHECK("Smsg_tx_cq full", status);
1870     }
1871 }
1872
1873 static void  SendRdmaMsg()
1874 {
1875     gni_return_t            status = GNI_RC_SUCCESS;
1876     gni_mem_handle_t        msg_mem_hndl;
1877     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
1878     RDMA_REQUEST            *pre = 0;
1879     int i, register_size = 0;
1880     void                    *msg;
1881 #if CMK_SMP
1882     int len = PCQueueLength(sendRdmaBuf);
1883     for (i=0; i<len; i++)
1884     {
1885         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1886         if (ptr == NULL) break;
1887 #else
1888     ptr = sendRdmaBuf;
1889     while (ptr!=0)
1890     {
1891 #endif 
1892         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1893         gni_post_descriptor_t *pd = ptr->pd;
1894         status = GNI_RC_SUCCESS;
1895         
1896         if(pd->cqwrite_value == 0)
1897         {
1898             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1899             {
1900                 msg = (void*)(pd->local_addr);
1901                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1902                 if(status == GNI_RC_SUCCESS)
1903                 {
1904                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1905                 }
1906             }else
1907             {
1908                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1909             }
1910             if(NoMsgInRecv( (void*)(pd->local_addr)))
1911                 register_size = GetMempoolsize((void*)(pd->local_addr));
1912             else
1913                 register_size = 0;
1914         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1915         {
1916             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1917         }
1918         if(status == GNI_RC_SUCCESS)        //mem register good
1919         {
1920             CMI_GNI_LOCK
1921             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1922                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1923             else
1924                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1925             CMI_GNI_UNLOCK
1926             if(status == GNI_RC_SUCCESS)    //post good
1927             {
1928 #if !CMK_SMP
1929                 tmp_ptr = ptr;
1930                 if(pre != 0) {
1931                     pre->next = ptr->next;
1932                 }
1933                 else {
1934                     sendRdmaBuf = ptr->next;
1935                 }
1936                 ptr = ptr->next;
1937                 FreeRdmaRequest(tmp_ptr);
1938 #endif
1939                 if(pd->cqwrite_value == 0)
1940                 {
1941                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1942                 }
1943 #if MACHINE_DEBUG_LOG
1944                 buffered_recv_msg += register_size;
1945                 MACHSTATE(8, "GO request from buffered\n"); 
1946 #endif
1947             }else           // cannot post
1948             {
1949 #if CMK_SMP
1950                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1951 #else
1952                 pre = ptr;
1953                 ptr = ptr->next;
1954 #endif
1955                 break;
1956             }
1957         } else          //memory registration fails
1958         {
1959 #if CMK_SMP
1960             PCQueuePush(sendRdmaBuf, (char*)ptr);
1961 #else
1962             pre = ptr;
1963             ptr = ptr->next;
1964 #endif
1965         }
1966     } //end while
1967 #if ! CMK_SMP
1968     if(ptr == 0)
1969         sendRdmaTail = pre;
1970 #endif
1971 }
1972
1973 // return 1 if all messages are sent
1974 static int SendBufferMsg(SMSG_QUEUE *queue)
1975 {
1976     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1977     CONTROL_MSG         *control_msg_tmp;
1978     gni_return_t        status;
1979     int                 done = 1;
1980     int                 register_size;
1981     void                *register_addr;
1982     int                 index_previous = -1;
1983     int                 index = queue->smsg_head_index;
1984 #if CMI_EXERT_SEND_CAP
1985     int                 sent_cnt = 0;
1986 #endif
1987
1988 #if !CMK_SMP
1989     index = queue->smsg_head_index;
1990 #else
1991     index = 0;
1992 #endif
1993 #if CMK_SMP
1994     while(index <mysize)
1995     {
1996         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1997         for (i=0; i<len; i++) 
1998         {
1999             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
2000             if (ptr == 0) break;
2001 #else
2002     while(index != -1)
2003     {
2004         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2005         pre = 0;
2006         while(ptr != 0)
2007         {
2008 #endif
2009             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
2010             status = GNI_RC_ERROR_RESOURCE;
2011             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
2012                 /* connection not exists yet */
2013             }
2014             else
2015             switch(ptr->tag)
2016             {
2017             case SMALL_DATA_TAG:
2018                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2019                 if(status == GNI_RC_SUCCESS)
2020                 {
2021                     CmiFree(ptr->msg);
2022                 }
2023                 break;
2024             case LMSG_INIT_TAG:
2025                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2026                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2027                 break;
2028             case   ACK_TAG:
2029             case   BIG_MSG_TAG:
2030                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2031                 if(status == GNI_RC_SUCCESS)
2032                 {
2033                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2034                 }
2035                 break;
2036 #ifdef CMK_DIRECT
2037             case DIRECT_PUT_DONE_TAG:
2038                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2039                 if(status == GNI_RC_SUCCESS)
2040                 {
2041                     free((CMK_DIRECT_HEADER*)ptr->msg);
2042                 }
2043                 break;
2044
2045 #endif
2046             default:
2047                 printf("Weird tag\n");
2048                 CmiAbort("should not happen\n");
2049             }       // end switch
2050             if(status == GNI_RC_SUCCESS)
2051             {
2052 #if !CMK_SMP
2053                 tmp_ptr = ptr;
2054                 if(pre)
2055                 {
2056                     ptr = pre ->next = ptr->next;
2057                 }else
2058                 {
2059                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2060                 }
2061                 FreeMsgList(tmp_ptr);
2062 #else
2063                 FreeMsgList(ptr);
2064 #endif
2065 #if PRINT_SYH
2066                 buffered_smsg_counter--;
2067                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2068 #endif
2069 #if CMI_EXERT_SEND_CAP
2070                 sent_cnt++;
2071                 if(sent_cnt == SEND_CAP)
2072                     break;
2073 #endif
2074             }else {
2075 #if CMK_SMP
2076                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2077 #else
2078                 pre = ptr;
2079                 ptr=ptr->next;
2080 #endif
2081                 done = 0;
2082                 if(status == GNI_RC_ERROR_RESOURCE)
2083                     break;
2084             } 
2085         } //end while
2086 #if !CMK_SMP
2087         if(ptr == 0)
2088             queue->smsg_msglist_index[index].tail = pre;
2089         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2090         {
2091             if(index_previous != -1)
2092                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2093             else
2094                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2095         }else
2096         {index_previous = index;
2097         }
2098         index = queue->smsg_msglist_index[index].next;
2099 #else
2100         index++;
2101 #endif
2102
2103 #if CMI_EXERT_SEND_CAP
2104         if(sent_cnt == SEND_CAP)
2105                 break;
2106 #endif
2107     }   // end pooling for all cores
2108     return done;
2109 }
2110
2111 static void ProcessDeadlock();
2112 void LrtsAdvanceCommunication(int whileidle)
2113 {
2114     /*  Receive Msg first */
2115 #if CMK_SMP_TRACE_COMMTHREAD
2116     double startT, endT;
2117 #endif
2118     if (useDynamicSMSG && whileidle)
2119     {
2120 #if CMK_SMP_TRACE_COMMTHREAD
2121         startT = CmiWallTimer();
2122 #endif
2123         PumpDatagramConnection();
2124 #if CMK_SMP_TRACE_COMMTHREAD
2125         endT = CmiWallTimer();
2126         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2127 #endif
2128     }
2129
2130 #if CMK_SMP_TRACE_COMMTHREAD
2131     startT = CmiWallTimer();
2132 #endif
2133     PumpNetworkSmsg();
2134     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2135 #if CMK_SMP_TRACE_COMMTHREAD
2136     endT = CmiWallTimer();
2137     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2138 #endif
2139
2140 #if CMK_SMP_TRACE_COMMTHREAD
2141     startT = CmiWallTimer();
2142 #endif
2143     PumpLocalRdmaTransactions();
2144     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2145 #if CMK_SMP_TRACE_COMMTHREAD
2146     endT = CmiWallTimer();
2147     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2148 #endif
2149     /* Send buffered Message */
2150 #if CMK_SMP_TRACE_COMMTHREAD
2151     startT = CmiWallTimer();
2152 #endif
2153 #if CMK_USE_OOB
2154     if (SendBufferMsg(&smsg_oob_queue) == 1)
2155 #endif
2156     SendBufferMsg(&smsg_queue);
2157     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2158 #if CMK_SMP_TRACE_COMMTHREAD
2159     endT = CmiWallTimer();
2160     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2161 #endif
2162
2163 #if CMK_SMP_TRACE_COMMTHREAD
2164     startT = CmiWallTimer();
2165 #endif
2166     SendRdmaMsg();
2167     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2168 #if CMK_SMP_TRACE_COMMTHREAD
2169     endT = CmiWallTimer();
2170     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2171 #endif
2172
2173 #if CMK_SMP
2174     if (_detected_hang)  ProcessDeadlock();
2175 #endif
2176 }
2177
2178 /* useDynamicSMSG */
2179 static void _init_dynamic_smsg()
2180 {
2181     gni_return_t status;
2182     uint32_t     vmdh_index = -1;
2183     int i;
2184
2185     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2186     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2187     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2188     for(i=0; i<mysize; i++) {
2189         smsg_connected_flag[i] = 0;
2190         smsg_attr_vector_local[i] = NULL;
2191         smsg_attr_vector_remote[i] = NULL;
2192     }
2193     if(mysize <=512)
2194     {
2195         SMSG_MAX_MSG = 4096;
2196     }else if (mysize <= 4096)
2197     {
2198         SMSG_MAX_MSG = 4096/mysize * 1024;
2199     }else if (mysize <= 16384)
2200     {
2201         SMSG_MAX_MSG = 512;
2202     }else {
2203         SMSG_MAX_MSG = 256;
2204     }
2205
2206     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2207     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2208     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2209     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2210     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2211
2212     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2213     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2214     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2215     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2216     mailbox_list->offset = 0;
2217     mailbox_list->next = 0;
2218     
2219     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2220         mailbox_list->size, smsg_rx_cqh,
2221         GNI_MEM_READWRITE,   
2222         vmdh_index,
2223         &(mailbox_list->mem_hndl));
2224     GNI_RC_CHECK("MEMORY registration for smsg", status);
2225
2226     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2227     GNI_RC_CHECK("Unbound EP", status);
2228     
2229     alloc_smsg_attr(&send_smsg_attr);
2230
2231     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2232     GNI_RC_CHECK("post unbound datagram", status);
2233
2234       /* always pre-connect to proc 0 */
2235     //if (myrank != 0) connect_to(0);
2236 }
2237
2238 static void _init_static_smsg()
2239 {
2240     gni_smsg_attr_t      *smsg_attr;
2241     gni_smsg_attr_t      remote_smsg_attr;
2242     gni_smsg_attr_t      *smsg_attr_vec;
2243     gni_mem_handle_t     my_smsg_mdh_mailbox;
2244     int      ret, i;
2245     gni_return_t status;
2246     uint32_t              vmdh_index = -1;
2247     mdh_addr_t            base_infor;
2248     mdh_addr_t            *base_addr_vec;
2249     char *env;
2250
2251     if(mysize <=512)
2252     {
2253         SMSG_MAX_MSG = 1024;
2254     }else if (mysize <= 4096)
2255     {
2256         SMSG_MAX_MSG = 1024;
2257     }else if (mysize <= 16384)
2258     {
2259         SMSG_MAX_MSG = 512;
2260     }else {
2261         SMSG_MAX_MSG = 256;
2262     }
2263     
2264     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2265     if (env) SMSG_MAX_MSG = atoi(env);
2266     CmiAssert(SMSG_MAX_MSG > 0);
2267
2268     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2269     
2270     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2271     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2272     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2273     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2274     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2275     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2276     CmiAssert(ret == 0);
2277     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2278     
2279     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2280             smsg_memlen*(mysize), smsg_rx_cqh,
2281             GNI_MEM_READWRITE,   
2282             vmdh_index,
2283             &my_smsg_mdh_mailbox);
2284     register_memory_size += smsg_memlen*(mysize);
2285     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2286
2287     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2288     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2289
2290     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2291     base_infor.mdh =  my_smsg_mdh_mailbox;
2292     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2293
2294     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2295  
2296     for(i=0; i<mysize; i++)
2297     {
2298         if(i==myrank)
2299             continue;
2300         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2301         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2302         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2303         smsg_attr[i].mbox_offset = i*smsg_memlen;
2304         smsg_attr[i].buff_size = smsg_memlen;
2305         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2306         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2307     }
2308
2309     for(i=0; i<mysize; i++)
2310     {
2311         if (myrank == i) continue;
2312
2313         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2314         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2315         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2316         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2317         remote_smsg_attr.buff_size = smsg_memlen;
2318         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2319         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2320
2321         /* initialize the smsg channel */
2322         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2323         GNI_RC_CHECK("SMSG Init", status);
2324     } //end initialization
2325
2326     free(base_addr_vec);
2327     free(smsg_attr);
2328
2329     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2330     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2331
2332
2333 inline
2334 static void _init_send_queue(SMSG_QUEUE *queue)
2335 {
2336      int i;
2337      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2338      for(i =0; i<mysize; i++)
2339      {
2340         queue->smsg_msglist_index[i].next = -1;
2341 #if CMK_SMP
2342         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2343 #else
2344         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2345 #endif
2346         
2347      }
2348      queue->smsg_head_index = -1;
2349 }
2350
2351 inline
2352 static void _init_smsg()
2353 {
2354     if(mysize > 1) {
2355         if (useDynamicSMSG)
2356             _init_dynamic_smsg();
2357         else
2358             _init_static_smsg();
2359     }
2360
2361     _init_send_queue(&smsg_queue);
2362 #if CMK_USE_OOB
2363     _init_send_queue(&smsg_oob_queue);
2364 #endif
2365 }
2366
2367 static void _init_static_msgq()
2368 {
2369     gni_return_t status;
2370     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2371     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2372     msgq_attrs.smsg_q_sz = 1;
2373     msgq_attrs.rcv_pool_sz = 1;
2374     msgq_attrs.num_msgq_eps = 2;
2375     msgq_attrs.nloc_insts = 8;
2376     msgq_attrs.modes = 0;
2377     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2378
2379     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2380     GNI_RC_CHECK("MSGQ Init", status);
2381
2382
2383 }
2384
2385 #if CMK_SMP && STEAL_MEMPOOL
2386 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2387 {
2388     void *pool = NULL;
2389     int i, k;
2390     // check other ranks
2391     for (k=0; k<CmiMyNodeSize()+1; k++) {
2392         i = (CmiMyRank()+k)%CmiMyNodeSize();
2393         if (i==CmiMyRank()) continue;
2394         mempool_type *mptr = CpvAccessOther(mempool, i);
2395         CmiLock(mptr->mempoolLock);
2396         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2397         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2398             CmiUnlock(mptr->mempoolLock);
2399             continue;
2400         }
2401         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2402         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2403             /* search in the free list */
2404           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2405           mempool_header *current = free_header;
2406           while (current) {
2407             if (current->next_free == (char*)header-(char*)mptr) break;
2408             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2409           }
2410           if (current == NULL) {         /* not found in free list */
2411             CmiUnlock(mptr->mempoolLock);
2412             continue;
2413           }
2414 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2415             /* search the previous memblock, and remove the tail */
2416           mempool_block *ptr = (mempool_block *)mptr;
2417           while (ptr) {
2418             if (ptr->memblock_next == mptr->memblock_tail) break;
2419             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2420           }
2421           CmiAssert(ptr!=NULL);
2422           ptr->memblock_next = 0;
2423           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2424
2425             /* remove memblock from the free list */
2426           current->next_free = header->next_free;
2427           if (header == free_header) mptr->freelist_head = header->next_free;
2428
2429           CmiUnlock(mptr->mempoolLock);
2430
2431           pool = (void*)tail;
2432           *mem_hndl = tail->mem_hndl;
2433           *size = tail->size;
2434           return pool;
2435         }
2436         CmiUnlock(mptr->mempoolLock);
2437     }
2438
2439       /* steal failed, deregister and free memblock now */
2440     int freed = 0;
2441     for (k=0; k<CmiMyNodeSize()+1; k++) {
2442         i = (CmiMyRank()+k)%CmiMyNodeSize();
2443         mempool_type *mptr = CpvAccessOther(mempool, i);
2444         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2445
2446         mempool_block *mempools_head = &(mptr->mempools_head);
2447         mempool_block *current = mempools_head;
2448         mempool_block *prev = NULL;
2449
2450         while (current) {
2451           int isfree = 0;
2452           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2453 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2454           mempool_header *cur = free_header;
2455           mempool_header *header;
2456           if (current != mempools_head) {
2457             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2458              /* search in free list */
2459             if (header->size == current->size - sizeof(mempool_block)) {
2460               cur = free_header;
2461               while (cur) {
2462                 if (cur->next_free == (char*)header-(char*)mptr) break;
2463                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2464               }
2465               if (cur != NULL) isfree = 1;
2466             }
2467           }
2468           if (isfree) {
2469               /* remove from free list */
2470             cur->next_free = header->next_free;
2471             if (header == free_header) mptr->freelist_head = header->next_free;
2472              // deregister
2473             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2474             GNI_RC_CHECK("steal Mempool de-register", status);
2475             mempool_block *ptr = current;
2476             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2477             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2478 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2479             freed += ptr->size;
2480             free(ptr);
2481              // try now
2482             if (freed > *size) {
2483               if (pool == NULL) {
2484                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2485                 CmiAssert(ret == 0);
2486               }
2487               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2488               if (status == GNI_RC_SUCCESS) {
2489                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2490 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2491                 return pool;
2492               }
2493 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2494             }
2495           }
2496           else {
2497              prev = current;
2498              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2499           }
2500         }
2501
2502         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2503     }
2504       /* still no luck registering pool */
2505     if (pool) free(pool);
2506     return NULL;
2507 }
2508 #endif
2509
2510 static long long int total_mempool_size = 0;
2511 static long long int total_mempool_calls = 0;
2512
2513 #if USE_LRTS_MEMPOOL
2514 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2515 {
2516     void *pool;
2517     int ret;
2518
2519     int default_size =  expand_flag? _expand_mem : _mempool_size;
2520     if (*size < default_size) *size = default_size;
2521     total_mempool_size += *size;
2522     total_mempool_calls += 1;
2523     if (*size > MAX_REG_MEM) {
2524         printf("Error: A mempool block with size %d is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX.", *size);
2525         CmiAbort("alloc_mempool_block");
2526     }
2527     ret = posix_memalign(&pool, ALIGNBUF, *size);
2528     if (ret != 0) {
2529 #if CMK_SMP && STEAL_MEMPOOL
2530       pool = steal_mempool_block(size, mem_hndl);
2531       if (pool != NULL) return pool;
2532 #endif
2533       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2534       if (ret == ENOMEM)
2535         CmiAbort("alloc_mempool_block: out of memory.");
2536       else
2537         CmiAbort("alloc_mempool_block: posix_memalign failed");
2538     }
2539     SetMemHndlZero((*mem_hndl));
2540     
2541     return pool;
2542 }
2543
2544 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2545 {
2546     if(!(IsMemHndlZero(mem_hndl)))
2547     {
2548         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2549         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2550     }
2551     free(ptr);
2552 }
2553 #endif
2554
2555 void LrtsPreCommonInit(int everReturn){
2556 #if USE_LRTS_MEMPOOL
2557     CpvInitialize(mempool_type*, mempool);
2558     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2559     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2560 #endif
2561 }
2562
2563
2564 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2565 {
2566     register int            i;
2567     int                     rc;
2568     int                     device_id = 0;
2569     unsigned int            remote_addr;
2570     gni_cdm_handle_t        cdm_hndl;
2571     gni_return_t            status = GNI_RC_SUCCESS;
2572     uint32_t                vmdh_index = -1;
2573     uint8_t                 ptag;
2574     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2575     int                     first_spawned;
2576     int                     physicalID;
2577     char                   *env;
2578
2579     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2580     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2581     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2582    
2583     status = PMI_Init(&first_spawned);
2584     GNI_RC_CHECK("PMI_Init", status);
2585
2586     status = PMI_Get_size(&mysize);
2587     GNI_RC_CHECK("PMI_Getsize", status);
2588
2589     status = PMI_Get_rank(&myrank);
2590     GNI_RC_CHECK("PMI_getrank", status);
2591
2592     //physicalID = CmiPhysicalNodeID(myrank);
2593     
2594     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2595
2596     *myNodeID = myrank;
2597     *numNodes = mysize;
2598   
2599 #if MULTI_THREAD_SEND
2600     /* Currently, we only consider the case that comm. thread will only recv msgs */
2601     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2602 #endif
2603
2604     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2605     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2606     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2607
2608     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2609     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2610     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2611
2612     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2613     if (env) useDynamicSMSG = 1;
2614     if (!useDynamicSMSG)
2615       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2616     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2617     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2618     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2619     
2620     if(myrank == 0)
2621     {
2622         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2623         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2624     }
2625 #ifdef USE_ONESIDED
2626     onesided_init(NULL, &onesided_hnd);
2627
2628     // this is a GNI test, so use the libonesided bypass functionality
2629     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2630     local_addr = gniGetNicAddress();
2631 #else
2632     ptag = get_ptag();
2633     cookie = get_cookie();
2634 #if 0
2635     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2636 #endif
2637     //Create and attach to the communication  domain */
2638     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2639     GNI_RC_CHECK("GNI_CdmCreate", status);
2640     //* device id The device id is the minor number for the device
2641     //that is assigned to the device by the system when the device is created.
2642     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2643     //where X is the device number 0 default 
2644     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2645     GNI_RC_CHECK("GNI_CdmAttach", status);
2646     local_addr = get_gni_nic_address(0);
2647 #endif
2648     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2649     _MEMCHECK(MPID_UGNI_AllAddr);
2650     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2651     /* create the local completion queue */
2652     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2653     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2654     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2655     
2656     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2657     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2658     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2659
2660     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2661     GNI_RC_CHECK("Create CQ (rx)", status);
2662     
2663     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2664     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2665     
2666     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2667     //GNI_RC_CHECK("Create BTE CQ", status);
2668
2669     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2670     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2671     _MEMCHECK(ep_hndl_array);
2672 #if CMK_SMP && !COMM_THREAD_SEND
2673     tx_cq_lock  = CmiCreateLock();
2674     rx_cq_lock  = CmiCreateLock();
2675 #endif
2676     for (i=0; i<mysize; i++) {
2677         if(i == myrank) continue;
2678         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2679         GNI_RC_CHECK("GNI_EpCreate ", status);   
2680         remote_addr = MPID_UGNI_AllAddr[i];
2681         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2682         GNI_RC_CHECK("GNI_EpBind ", status);   
2683     }
2684
2685     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2686     _init_smsg();
2687     PMI_Barrier();
2688
2689 #if     USE_LRTS_MEMPOOL
2690     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2691     if (env) _mempool_size = CmiReadSize(env);
2692     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2693         _mempool_size = CmiReadSize(env);
2694
2695
2696     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2697     if (env) MAX_REG_MEM = CmiReadSize(env);
2698     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) 
2699         MAX_REG_MEM = CmiReadSize(env);
2700
2701     env = getenv("CHARM_UGNI_SEND_MAX");
2702     if (env) MAX_BUFF_SEND = CmiReadSize(env);
2703     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) 
2704         MAX_BUFF_SEND = CmiReadSize(env);
2705
2706     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2707     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2708
2709     if (myrank==0) {
2710         printf("Charm++> memory pool init size: %1.fMB\n", _mempool_size/1024.0/1024);
2711         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2712         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2713             /* memblock can expand to BIG_MSG * 2 size */
2714             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2715             CmiAbort("mempool maximum size is too small. \n");
2716         }
2717 #if CMK_SMP && MULTI_THREAD_SEND
2718         printf("Charm++> worker thread sending messages\n");
2719 #elif CMK_SMP && COMM_THREAD_SEND
2720         printf("Charm++> only comm thread send/recv messages\n");
2721 #endif
2722     }
2723 #endif
2724
2725     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
2726     if (env) {
2727         BIG_MSG = CmiReadSize(env);
2728         if (BIG_MSG < ONE_SEG)
2729           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
2730     }
2731     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
2732     if (env) {
2733         BIG_MSG_PIPELINE = atoi(env);
2734     }
2735
2736     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2737     if (env) _checkProgress = 0;
2738     if (mysize == 1) _checkProgress = 0;
2739
2740     /* init DMA buffer for medium message */
2741
2742     //_init_DMA_buffer();
2743     
2744     free(MPID_UGNI_AllAddr);
2745 #if CMK_SMP
2746     sendRdmaBuf = PCQueueCreate();
2747 #else
2748     sendRdmaBuf = 0;
2749 #endif
2750 #if MACHINE_DEBUG_LOG
2751     char ln[200];
2752     sprintf(ln,"debugLog.%d",myrank);
2753     debugLog=fopen(ln,"w");
2754 #endif
2755
2756 }
2757
2758 void* LrtsAlloc(int n_bytes, int header)
2759 {
2760     void *ptr;
2761 #if 0
2762     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2763 #endif
2764     if(n_bytes <= SMSG_MAX_MSG)
2765     {
2766         int totalsize = n_bytes+header;
2767         ptr = malloc(totalsize);
2768     }
2769     else {
2770         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2771 #if     USE_LRTS_MEMPOOL
2772         n_bytes = ALIGN64(n_bytes);
2773         if(n_bytes < BIG_MSG)
2774         {
2775             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2776             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2777         }else 
2778         {
2779             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2780             ptr = res + ALIGNBUF - header;
2781
2782         }
2783 #else
2784         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2785         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2786         ptr = res + ALIGNBUF - header;
2787 #endif
2788     }
2789     return ptr;
2790 }
2791
2792 void  LrtsFree(void *msg)
2793 {
2794     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2795     if (size <= SMSG_MAX_MSG)
2796       free(msg);
2797     else if(size>=BIG_MSG)
2798     {
2799         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2800
2801     }else
2802     {
2803 #if     USE_LRTS_MEMPOOL
2804 #if CMK_SMP
2805         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2806 #else
2807         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2808 #endif
2809 #else
2810         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2811 #endif
2812     }
2813 }
2814
2815 void LrtsExit()
2816 {
2817     /* free memory ? */
2818 #if USE_LRTS_MEMPOOL
2819     mempool_destroy(CpvAccess(mempool));
2820 #endif
2821     PMI_Finalize();
2822     exit(0);
2823 }
2824
2825 void LrtsDrainResources()
2826 {
2827     if(mysize == 1) return;
2828     while (
2829 #if CMK_USE_OOB
2830            !SendBufferMsg(&smsg_oob_queue) ||
2831 #endif
2832            !SendBufferMsg(&smsg_queue))
2833     {
2834         if (useDynamicSMSG)
2835             PumpDatagramConnection();
2836         PumpNetworkSmsg();
2837         PumpLocalRdmaTransactions();
2838         SendRdmaMsg();
2839     }
2840     PMI_Barrier();
2841 }
2842
2843 void LrtsAbort(const char *message) {
2844     printf("CmiAbort is calling on PE:%d\n", myrank);
2845     CmiPrintStackTrace(0);
2846     PMI_Abort(-1, message);
2847 }
2848
2849 /**************************  TIMER FUNCTIONS **************************/
2850 #if CMK_TIMER_USE_SPECIAL
2851 /* MPI calls are not threadsafe, even the timer on some machines */
2852 static CmiNodeLock  timerLock = 0;
2853 static int _absoluteTime = 0;
2854 static int _is_global = 0;
2855 static struct timespec start_ts;
2856
2857 inline int CmiTimerIsSynchronized() {
2858     return 0;
2859 }
2860
2861 inline int CmiTimerAbsolute() {
2862     return _absoluteTime;
2863 }
2864
2865 double CmiStartTimer() {
2866     return 0.0;
2867 }
2868
2869 double CmiInitTime() {
2870     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2871 }
2872
2873 void CmiTimerInit(char **argv) {
2874     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2875     if (_absoluteTime && CmiMyPe() == 0)
2876         printf("Charm++> absolute  timer is used\n");
2877     
2878     _is_global = CmiTimerIsSynchronized();
2879
2880
2881     if (_is_global) {
2882         if (CmiMyRank() == 0) {
2883             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2884         }
2885     } else { /* we don't have a synchronous timer, set our own start time */
2886         CmiBarrier();
2887         CmiBarrier();
2888         CmiBarrier();
2889         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2890     }
2891     CmiNodeAllBarrier();          /* for smp */
2892 }
2893
2894 /**
2895  * Since the timerLock is never created, and is
2896  * always NULL, then all the if-condition inside
2897  * the timer functions could be disabled right
2898  * now in the case of SMP.
2899  */
2900 double CmiTimer(void) {
2901     struct timespec now_ts;
2902     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2903     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2904         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2905 }
2906
2907 double CmiWallTimer(void) {
2908     struct timespec now_ts;
2909     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2910     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2911         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2912 }
2913
2914 double CmiCpuTimer(void) {
2915     struct timespec now_ts;
2916     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2917     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2918         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2919 }
2920
2921 #endif
2922 /************Barrier Related Functions****************/
2923
2924 int CmiBarrier()
2925 {
2926     gni_return_t status;
2927
2928 #if CMK_SMP
2929     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2930     CmiNodeAllBarrier();
2931     if (CmiMyRank() == CmiMyNodeSize())
2932 #else
2933     if (CmiMyRank() == 0)
2934 #endif
2935     {
2936         /**
2937          *  The call of CmiBarrier is usually before the initialization
2938          *  of trace module of Charm++, therefore, the START_EVENT
2939          *  and END_EVENT are disabled here. -Chao Mei
2940          */
2941         /*START_EVENT();*/
2942         status = PMI_Barrier();
2943         GNI_RC_CHECK("PMI_Barrier", status);
2944         /*END_EVENT(10);*/
2945     }
2946     CmiNodeAllBarrier();
2947     return status;
2948
2949 }
2950 #if CMK_DIRECT
2951 #include "machine-cmidirect.c"
2952 #endif
2953 #if CMK_PERSISTENT_COMM
2954 #include "machine-persistent.c"
2955 #endif
2956
2957