minor fix
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17
18     other environment variables:
19
20     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes      # disable checking deadlock
21  */
22 /*@{*/
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <errno.h>
28 #include <malloc.h>
29 #include <unistd.h>
30
31 #include <gni_pub.h>
32 #include <pmi.h>
33 #include <time.h>
34
35 #include "converse.h"
36
37 #define PRINT_SYH  0
38
39 // Trace communication thread
40 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
41 #define TRACE_THRESHOLD     0.00005
42 #define CMI_MPI_TRACE_MOREDETAILED 0
43 #undef CMI_MPI_TRACE_USEREVENTS
44 #define CMI_MPI_TRACE_USEREVENTS 1
45 #else
46 #undef CMK_SMP_TRACE_COMMTHREAD
47 #define CMK_SMP_TRACE_COMMTHREAD 0
48 #endif
49
50 #define CMK_TRACE_COMMOVERHEAD 0
51 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
52 #undef CMI_MPI_TRACE_USEREVENTS
53 #define CMI_MPI_TRACE_USEREVENTS 1
54 #else
55 #undef CMK_TRACE_COMMOVERHEAD
56 #define CMK_TRACE_COMMOVERHEAD 0
57 #endif
58
59 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
60 CpvStaticDeclare(double, projTraceStart);
61 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
62 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
63 #else
64 #define  START_EVENT()
65 #define  END_EVENT(x)
66 #endif
67
68 #define CMI_EXERT_SEND_CAP      0
69 #define CMI_EXERT_RECV_CAP      0
70
71 #if CMI_EXERT_SEND_CAP
72 #define SEND_CAP 16
73 #endif
74
75 #if CMI_EXERT_RECV_CAP
76 #define RECV_CAP 2
77 #endif
78
79 #define REMOTE_EVENT                      0
80 #define USE_LRTS_MEMPOOL                  1
81
82 #if USE_LRTS_MEMPOOL
83
84 #define oneMB (1024ll*1024)
85 static CmiInt8 _mempool_size = 8*oneMB;
86 static CmiInt8 _expand_mem =  4*oneMB;
87
88 #endif
89
90 #if CMK_SMP && COMM_THREAD_SEND 
91 //Dynamic flow control about memory registration
92 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
93 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
94 #else
95 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
96 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
97 #endif
98
99 static CmiInt8 buffered_send_msg = 0;
100 static int register_memory_size = 0;
101
102 static int BIG_MSG  =  4*oneMB;
103 static int  ONE_SEG  = 2*oneMB;
104 #define BIG_MSG_PIPELINE         2
105
106 #if CMK_SMP
107 #define COMM_THREAD_SEND 1
108 //#define MULTI_THREAD_SEND 1
109 #endif
110
111 #if CMK_SMP && MULTI_THREAD_SEND
112 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
113 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
114 #else
115 #define     CMI_GNI_LOCK
116 #define     CMI_GNI_UNLOCK
117 #endif
118
119 static int _checkProgress = 1;             /* check deadlock */
120 static int _detected_hang = 0;
121
122 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
123
124 // dynamic SMSG
125 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
126
127 static int avg_smsg_connection = 32;
128 static int                 *smsg_connected_flag= 0;
129 static gni_smsg_attr_t     **smsg_attr_vector_local;
130 static gni_smsg_attr_t     **smsg_attr_vector_remote;
131 static gni_ep_handle_t     ep_hndl_unbound;
132 static gni_smsg_attr_t     send_smsg_attr;
133 static gni_smsg_attr_t     recv_smsg_attr;
134
135 typedef struct _dynamic_smsg_mailbox{
136    void     *mailbox_base;
137    int      size;
138    int      offset;
139    gni_mem_handle_t  mem_hndl;
140    struct      _dynamic_smsg_mailbox  *next;
141 }dynamic_smsg_mailbox_t;
142
143 static dynamic_smsg_mailbox_t  *mailbox_list;
144
145 int         rdma_id = 0;
146
147 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
148 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
149
150 #if PRINT_SYH
151 int         lrts_send_msg_id = 0;
152 int         lrts_local_done_msg = 0;
153 int         lrts_send_rdma_success = 0;
154 #endif
155
156 #include "machine.h"
157
158 #include "pcqueue.h"
159
160 #include "mempool.h"
161
162 #if CMK_PERSISTENT_COMM
163 #include "machine-persistent.h"
164 #endif
165
166 //#define  USE_ONESIDED 1
167 #ifdef USE_ONESIDED
168 //onesided implementation is wrong, since no place to restore omdh
169 #include "onesided.h"
170 onesided_hnd_t   onesided_hnd;
171 onesided_md_t    omdh;
172 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
173
174 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
175
176 #else
177 uint8_t   onesided_hnd, omdh;
178 #if REMOTE_EVENT
179 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
180          status = GNI_RC_ERROR_NOMEM;} \
181         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
182                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} }
183 #else
184 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
185     do {   \
186         if (register_memory_size + size >= MAX_REG_MEM) { \
187             status = GNI_RC_ERROR_NOMEM; \
188         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
189             if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
190     } while(0)
191 #endif
192 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
193     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
194              register_memory_size -= size; \
195          else CmiAbort("MEM_DEregister");  \
196     } while (0)
197 #endif
198
199 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
200 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
201 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
202                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
203 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
204                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
205 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
206 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
207 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
208 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
209 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
210 #define   GetSizeFromHeader(x) ((block_header*)x)->size
211 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
212 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
213 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
214 #define   IsMemHndlZero(x)  ((x).qword1 == 0 && (x).qword2 == 0)
215 #define   SetMemHndlZero(x)  do { (x).qword1 = 0; (x).qword2 = 0; } while (0)
216 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
217
218 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
219 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
220 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
221 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
222
223 #define ALIGNBUF                64
224
225 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
226 /* If SMSG is not used */
227
228 #define FMA_PER_CORE  1024
229 #define FMA_BUFFER_SIZE 1024
230
231 /* If SMSG is used */
232 static int  SMSG_MAX_MSG = 1024;
233 #define SMSG_MAX_CREDIT 72 
234
235 #define MSGQ_MAXSIZE       2048
236 /* large message transfer with FMA or BTE */
237 #define LRTS_GNI_RDMA_THRESHOLD  1024 
238
239 #if CMK_SMP
240 static int  REMOTE_QUEUE_ENTRIES=163840; 
241 static int LOCAL_QUEUE_ENTRIES=163840; 
242 #else
243 static int  REMOTE_QUEUE_ENTRIES=20480;
244 static int LOCAL_QUEUE_ENTRIES=20480; 
245 #endif
246
247 #define BIG_MSG_TAG  0x26
248 #define PUT_DONE_TAG      0x28
249 #define DIRECT_PUT_DONE_TAG      0x29
250 #define ACK_TAG           0x30
251 /* SMSG is data message */
252 #define SMALL_DATA_TAG          0x31
253 /* SMSG is a control message to initialize a BTE */
254 #define MEDIUM_HEAD_TAG         0x32
255 #define MEDIUM_DATA_TAG         0x33
256 #define LMSG_INIT_TAG           0x39 
257 #define VERY_LMSG_INIT_TAG      0x40 
258 #define VERY_LMSG_TAG           0x41 
259
260 #define DEBUG
261 #ifdef GNI_RC_CHECK
262 #undef GNI_RC_CHECK
263 #endif
264 #ifdef DEBUG
265 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
266 #else
267 #define GNI_RC_CHECK(msg,rc)
268 #endif
269
270 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
271 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
272
273 static int useStaticMSGQ = 0;
274 static int useStaticFMA = 0;
275 static int mysize, myrank;
276 gni_nic_handle_t      nic_hndl;
277
278 typedef struct {
279     gni_mem_handle_t mdh;
280     uint64_t addr;
281 } mdh_addr_t ;
282 // this is related to dynamic SMSG
283
284 typedef struct mdh_addr_list{
285     gni_mem_handle_t mdh;
286    void *addr;
287     struct mdh_addr_list *next;
288 }mdh_addr_list_t;
289
290 static unsigned int         smsg_memlen;
291 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
292 mdh_addr_t          setup_mem;
293 mdh_addr_t          *smsg_connection_vec = 0;
294 gni_mem_handle_t    smsg_connection_memhndl;
295 static int          smsg_expand_slots = 10;
296 static int          smsg_available_slot = 0;
297 static void         *smsg_mailbox_mempool = 0;
298 mdh_addr_list_t     *smsg_dynamic_list = 0;
299
300 static void             *smsg_mailbox_base;
301 gni_msgq_attr_t         msgq_attrs;
302 gni_msgq_handle_t       msgq_handle;
303 gni_msgq_ep_attr_t      msgq_ep_attrs;
304 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
305
306 /* =====Beginning of Declarations of Machine Specific Variables===== */
307 static int cookie;
308 static int modes = 0;
309 static gni_cq_handle_t       smsg_rx_cqh = NULL;
310 static gni_cq_handle_t       smsg_tx_cqh = NULL;
311 static gni_cq_handle_t       post_rx_cqh = NULL;
312 static gni_cq_handle_t       post_tx_cqh = NULL;
313 static gni_ep_handle_t       *ep_hndl_array;
314 #if CMK_SMP && MULTI_THREAD_SEND
315 static CmiNodeLock           *ep_lock_array;
316 static CmiNodeLock           tx_cq_lock; 
317 static CmiNodeLock           rx_cq_lock;
318 static CmiNodeLock           *mempool_lock;
319 #endif
320 typedef struct msg_list
321 {
322     uint32_t destNode;
323     uint32_t size;
324     void *msg;
325     uint8_t tag;
326 #if !CMK_SMP
327     struct msg_list *next;
328 #endif
329 }MSG_LIST;
330
331
332 typedef struct control_msg
333 {
334     uint64_t            source_addr;    /* address from the start of buffer  */
335     uint64_t            dest_addr;      /* address from the start of buffer */
336     //int                 source;         /* source rank */
337     int                 total_length;   /* total length */
338     int                 length;         /* length of this packet */
339     uint8_t             seq_id;         //big message   0 meaning single message
340     gni_mem_handle_t    source_mem_hndl;
341     struct control_msg *next;
342 }CONTROL_MSG;
343
344 #ifdef CMK_DIRECT
345 typedef struct{
346     void *recverBuf;
347     void (*callbackFnPtr)(void *);
348     void *callbackData;
349 }CMK_DIRECT_HEADER;
350 #endif
351 typedef struct  rmda_msg
352 {
353     int                   destNode;
354     gni_post_descriptor_t *pd;
355 #if !CMK_SMP
356     struct  rmda_msg      *next;
357 #endif
358 }RDMA_REQUEST;
359
360 #if CMK_SMP
361 PCQueue sendRdmaBuf;
362 typedef struct  msg_list_index
363 {
364     int         next;
365     PCQueue     sendSmsgBuf;
366 } MSG_LIST_INDEX;
367 #else
368 static RDMA_REQUEST        *sendRdmaBuf = 0;
369 static RDMA_REQUEST        *sendRdmaTail = 0;
370 typedef struct  msg_list_index
371 {
372     int         next;
373     MSG_LIST    *sendSmsgBuf;
374     MSG_LIST    *tail;
375 } MSG_LIST_INDEX;
376 #endif
377
378 /* reuse PendingMsg memory */
379 static CONTROL_MSG          *control_freelist=0;
380 static MSG_LIST             *msglist_freelist=0;
381
382 typedef struct smsg_queue
383 {
384     MSG_LIST_INDEX   *smsg_msglist_index;
385     int               smsg_head_index;
386 } SMSG_QUEUE;
387
388 SMSG_QUEUE                  smsg_queue;
389 SMSG_QUEUE                  smsg_oob_queue;
390
391 #if CMK_SMP
392
393 #define FreeMsgList(d)   free(d);
394 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
395
396 #else
397
398 #define FreeMsgList(d)  \
399   (d)->next = msglist_freelist;\
400   msglist_freelist = d;
401
402 #define MallocMsgList(d) \
403   d = msglist_freelist;\
404   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
405              _MEMCHECK(d);\
406   } else msglist_freelist = d->next; \
407   d->next =0;
408 #endif
409
410 #if CMK_SMP
411
412 #define FreeControlMsg(d)      free(d);
413 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
414
415 #else
416
417 #define FreeControlMsg(d)       \
418   (d)->next = control_freelist;\
419   control_freelist = d;
420
421 #define MallocControlMsg(d) \
422   d = control_freelist;\
423   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
424              _MEMCHECK(d);\
425   } else control_freelist = d->next;
426
427 #endif
428
429 static RDMA_REQUEST         *rdma_freelist = NULL;
430
431 #define FreeMediumControlMsg(d)       \
432   (d)->next = medium_control_freelist;\
433   medium_control_freelist = d;
434
435
436 #define MallocMediumControlMsg(d) \
437     d = medium_control_freelist;\
438     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
439     _MEMCHECK(d);\
440 } else mediumcontrol_freelist = d->next;
441
442 # if CMK_SMP
443 #define FreeRdmaRequest(d)       free(d);
444 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
445 #else
446
447 #define FreeRdmaRequest(d)       \
448   (d)->next = rdma_freelist;\
449   rdma_freelist = d;
450
451 #define MallocRdmaRequest(d) \
452   d = rdma_freelist;\
453   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
454              _MEMCHECK(d);\
455   } else rdma_freelist = d->next; \
456     d->next =0;
457 #endif
458
459 /* reuse gni_post_descriptor_t */
460 static gni_post_descriptor_t *post_freelist=0;
461
462 #if  !CMK_SMP
463 #define FreePostDesc(d)       \
464     (d)->next_descr = post_freelist;\
465     post_freelist = d;
466
467 #define MallocPostDesc(d) \
468   d = post_freelist;\
469   if (d==0) { \
470      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
471      _MEMCHECK(d);\
472   } else post_freelist = d->next_descr;
473 #else
474
475 #define FreePostDesc(d)     free(d);
476 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
477
478 #endif
479
480
481 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
482 static int      buffered_smsg_counter = 0;
483
484 /* SmsgSend return success but message sent is not confirmed by remote side */
485 static MSG_LIST *buffered_fma_head = 0;
486 static MSG_LIST *buffered_fma_tail = 0;
487
488 /* functions  */
489 #define IsFree(a,ind)  !( a& (1<<(ind) ))
490 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
491 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
492
493 CpvDeclare(mempool_type*, mempool);
494
495 /* get the upper bound of log 2 */
496 int mylog2(int size)
497 {
498     int op = size;
499     unsigned int ret=0;
500     unsigned int mask = 0;
501     int i;
502     while(op>0)
503     {
504         op = op >> 1;
505         ret++;
506
507     }
508     for(i=1; i<ret; i++)
509     {
510         mask = mask << 1;
511         mask +=1;
512     }
513
514     ret -= ((size &mask) ? 0:1);
515     return ret;
516 }
517
518 static void
519 allgather(void *in,void *out, int len)
520 {
521     static int *ivec_ptr=NULL,already_called=0,job_size=0;
522     int i,rc;
523     int my_rank;
524     char *tmp_buf,*out_ptr;
525
526     if(!already_called) {
527
528         rc = PMI_Get_size(&job_size);
529         CmiAssert(rc == PMI_SUCCESS);
530         rc = PMI_Get_rank(&my_rank);
531         CmiAssert(rc == PMI_SUCCESS);
532
533         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
534         CmiAssert(ivec_ptr != NULL);
535
536         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
537         CmiAssert(rc == PMI_SUCCESS);
538
539         already_called = 1;
540
541     }
542
543     tmp_buf = (char *)malloc(job_size * len);
544     CmiAssert(tmp_buf);
545
546     rc = PMI_Allgather(in,tmp_buf,len);
547     CmiAssert(rc == PMI_SUCCESS);
548
549     out_ptr = out;
550
551     for(i=0;i<job_size;i++) {
552
553         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
554
555     }
556
557     free(tmp_buf);
558 }
559
560 static void
561 allgather_2(void *in,void *out, int len)
562 {
563     //PMI_Allgather is out of order
564     int i,rc, extend_len;
565     int  rank_index;
566     char *out_ptr, *out_ref;
567     char *in2;
568
569     extend_len = sizeof(int) + len;
570     in2 = (char*)malloc(extend_len);
571
572     memcpy(in2, &myrank, sizeof(int));
573     memcpy(in2+sizeof(int), in, len);
574
575     out_ptr = (char*)malloc(mysize*extend_len);
576
577     rc = PMI_Allgather(in2, out_ptr, extend_len);
578     GNI_RC_CHECK("allgather", rc);
579
580     out_ref = out;
581
582     for(i=0;i<mysize;i++) {
583         //rank index 
584         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
585         //copy to the rank index slot
586         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
587     }
588
589     free(out_ptr);
590     free(in2);
591
592 }
593
594 static unsigned int get_gni_nic_address(int device_id)
595 {
596     unsigned int address, cpu_id;
597     gni_return_t status;
598     int i, alps_dev_id=-1,alps_address=-1;
599     char *token, *p_ptr;
600
601     p_ptr = getenv("PMI_GNI_DEV_ID");
602     if (!p_ptr) {
603         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
604        
605         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
606     } else {
607         while ((token = strtok(p_ptr,":")) != NULL) {
608             alps_dev_id = atoi(token);
609             if (alps_dev_id == device_id) {
610                 break;
611             }
612             p_ptr = NULL;
613         }
614         CmiAssert(alps_dev_id != -1);
615         p_ptr = getenv("PMI_GNI_LOC_ADDR");
616         CmiAssert(p_ptr != NULL);
617         i = 0;
618         while ((token = strtok(p_ptr,":")) != NULL) {
619             if (i == alps_dev_id) {
620                 alps_address = atoi(token);
621                 break;
622             }
623             p_ptr = NULL;
624             ++i;
625         }
626         CmiAssert(alps_address != -1);
627         address = alps_address;
628     }
629     return address;
630 }
631
632 static uint8_t get_ptag(void)
633 {
634     char *p_ptr, *token;
635     uint8_t ptag;
636
637     p_ptr = getenv("PMI_GNI_PTAG");
638     CmiAssert(p_ptr != NULL);
639     token = strtok(p_ptr, ":");
640     ptag = (uint8_t)atoi(token);
641     return ptag;
642         
643 }
644
645 static uint32_t get_cookie(void)
646 {
647     uint32_t cookie;
648     char *p_ptr, *token;
649
650     p_ptr = getenv("PMI_GNI_COOKIE");
651     CmiAssert(p_ptr != NULL);
652     token = strtok(p_ptr, ":");
653     cookie = (uint32_t)atoi(token);
654
655     return cookie;
656 }
657
658 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
659 /* TODO: add any that are related */
660 /* =====End of Definitions of Message-Corruption Related Macros=====*/
661
662
663 #include "machine-lrts.h"
664 #include "machine-common-core.c"
665
666 /* Network progress function is used to poll the network when for
667    messages. This flushes receive buffers on some  implementations*/
668 #if CMK_MACHINE_PROGRESS_DEFINED
669 void CmiMachineProgressImpl() {
670 }
671 #endif
672
673 static void SendRdmaMsg();
674 static void PumpNetworkSmsg();
675 static void PumpLocalRdmaTransactions();
676 static int SendBufferMsg(SMSG_QUEUE *queue);
677
678 #if MACHINE_DEBUG_LOG
679 FILE *debugLog = NULL;
680 static CmiInt8 buffered_recv_msg = 0;
681 int         lrts_smsg_success = 0;
682 int         lrts_received_msg = 0;
683 #endif
684
685 static void sweep_mempool(mempool_type *mptr)
686 {
687     block_header *current = &(mptr->block_head);
688
689     printf("[n %d] sweep_mempool slot START.\n", myrank);
690     while( current!= NULL) {
691         printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
692         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
693     }
694     printf("[n %d] sweep_mempool slot END.\n", myrank);
695 }
696
697 inline
698 static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
699 {
700     block_header *current = *from;
701
702     //while(register_memory_size>= MAX_REG_MEM)
703     //{
704         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
705             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
706
707         *from = current;
708         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
709         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
710         SetMemHndlZero(GetMemHndlFromHeader(current));
711     //}
712     return GNI_RC_SUCCESS;
713 }
714
715 inline 
716 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
717 {
718     gni_return_t status = GNI_RC_SUCCESS;
719     //int size = GetMempoolsize(msg);
720     //void *blockaddr = GetMempoolBlockPtr(msg);
721     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
722    
723     block_header *current = &(mptr->block_head);
724     while(register_memory_size>= MAX_REG_MEM)
725     {
726         status = deregisterMemory(mptr, &current);
727         if (status != GNI_RC_SUCCESS) break;
728     }
729     if(register_memory_size>= MAX_REG_MEM) return status;
730
731     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
732     while(1)
733     {
734         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status);
735         if(status == GNI_RC_SUCCESS)
736         {
737             break;
738         }
739         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
740         {
741             CmiAbort("Memory registor for mempool fails\n");
742         }
743         else
744         {
745             status = deregisterMemory(mptr, &current);
746             if (status != GNI_RC_SUCCESS) break;
747         }
748     }; 
749     return status;
750 }
751
752 inline 
753 static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
754 {
755     static int rank = -1;
756     int i;
757     gni_return_t status;
758     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
759     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
760     mempool_type *mptr;
761
762     status = registerFromMempool(mptr1, msg, size, t);
763     if (status == GNI_RC_SUCCESS) return status;
764 #if CMK_SMP 
765     for (i=0; i<CmiMyNodeSize()+1; i++) {
766       rank = (rank+1)%(CmiMyNodeSize()+1);
767       mptr = CpvAccessOther(mempool, rank);
768       if (mptr == mptr1) continue;
769       status = registerFromMempool(mptr, msg, size, t);
770       if (status == GNI_RC_SUCCESS) return status;
771     }
772 #endif
773     return  GNI_RC_ERROR_RESOURCE;
774 }
775
776 inline
777 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
778 {
779     MSG_LIST        *msg_tmp;
780     MallocMsgList(msg_tmp);
781     msg_tmp->destNode = destNode;
782     msg_tmp->size   = size;
783     msg_tmp->msg    = msg;
784     msg_tmp->tag    = tag;
785
786 #if !CMK_SMP
787     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
788         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
789         queue->smsg_head_index = destNode;
790         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
791     }else
792     {
793         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
794         queue->smsg_msglist_index[destNode].tail = msg_tmp;
795     }
796 #else
797     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
798 #endif
799 #if PRINT_SYH
800     buffered_smsg_counter++;
801 #endif
802 }
803
804 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
805 {
806     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
807 }
808
809 inline
810 static void setup_smsg_connection(int destNode)
811 {
812     mdh_addr_list_t  *new_entry = 0;
813     gni_post_descriptor_t *pd;
814     gni_smsg_attr_t      *smsg_attr;
815     gni_return_t status = GNI_RC_NOT_DONE;
816     RDMA_REQUEST        *rdma_request_msg;
817     
818     if(smsg_available_slot == smsg_expand_slots)
819     {
820         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
821         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
822         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
823
824         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
825             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
826             GNI_MEM_READWRITE,   
827             -1,
828             &(new_entry->mdh));
829         smsg_available_slot = 0; 
830         new_entry->next = smsg_dynamic_list;
831         smsg_dynamic_list = new_entry;
832     }
833     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
834     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
835     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
836     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
837     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
838     smsg_attr->buff_size = smsg_memlen;
839     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
840     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
841     smsg_local_attr_vec[destNode] = smsg_attr;
842     smsg_available_slot++;
843     MallocPostDesc(pd);
844     pd->type            = GNI_POST_FMA_PUT;
845     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
846     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
847     pd->length          = sizeof(gni_smsg_attr_t);
848     pd->local_addr      = (uint64_t) smsg_attr;
849     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
850     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
851     pd->src_cq_hndl     = 0;
852     pd->rdma_mode       = 0;
853     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
854     print_smsg_attr(smsg_attr);
855     if(status == GNI_RC_ERROR_RESOURCE )
856     {
857         MallocRdmaRequest(rdma_request_msg);
858         rdma_request_msg->destNode = destNode;
859         rdma_request_msg->pd = pd;
860         /* buffer this request */
861     }
862 #if PRINT_SYH
863     if(status != GNI_RC_SUCCESS)
864        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
865     else
866         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
867 #endif
868 }
869
870 /* useDynamicSMSG */
871 inline 
872 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
873 {
874     gni_return_t status = GNI_RC_NOT_DONE;
875
876     if(mailbox_list->offset == mailbox_list->size)
877     {
878         dynamic_smsg_mailbox_t *new_mailbox_entry;
879         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
880         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
881         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
882         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
883         new_mailbox_entry->offset = 0;
884         
885         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
886             new_mailbox_entry->size, smsg_rx_cqh,
887             GNI_MEM_READWRITE,   
888             -1,
889             &(new_mailbox_entry->mem_hndl));
890
891         GNI_RC_CHECK("register", status);
892         new_mailbox_entry->next = mailbox_list;
893         mailbox_list = new_mailbox_entry;
894     }
895     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
896     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
897     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
898     local_smsg_attr->mbox_offset = mailbox_list->offset;
899     mailbox_list->offset += smsg_memlen;
900     local_smsg_attr->buff_size = smsg_memlen;
901     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
902     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
903 }
904
905 /* useDynamicSMSG */
906 inline 
907 static int connect_to(int destNode)
908 {
909     gni_return_t status = GNI_RC_NOT_DONE;
910     CmiAssert(smsg_connected_flag[destNode] == 0);
911     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
912     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
913     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
914     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
915     
916     CMI_GNI_LOCK
917     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
918     CMI_GNI_UNLOCK
919     if (status == GNI_RC_ERROR_RESOURCE) {
920       /* possibly destNode is making connection at the same time */
921       free(smsg_attr_vector_local[destNode]);
922       smsg_attr_vector_local[destNode] = NULL;
923       free(smsg_attr_vector_remote[destNode]);
924       smsg_attr_vector_remote[destNode] = NULL;
925       mailbox_list->offset -= smsg_memlen;
926       return 0;
927     }
928     GNI_RC_CHECK("GNI_Post", status);
929     smsg_connected_flag[destNode] = 1;
930     return 1;
931 }
932
933 inline 
934 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
935 {
936     unsigned int          remote_address;
937     uint32_t              remote_id;
938     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
939     gni_smsg_attr_t       *smsg_attr;
940     gni_post_descriptor_t *pd;
941     gni_post_state_t      post_state;
942     char                  *real_data; 
943
944     if (useDynamicSMSG) {
945         switch (smsg_connected_flag[destNode]) {
946         case 0: 
947             connect_to(destNode);         /* continue to case 1 */
948         case 1:                           /* pending connection, do nothing */
949             status = GNI_RC_NOT_DONE;
950             if(inbuff ==0)
951                 buffer_small_msgs(queue, msg, size, destNode, tag);
952             return status;
953         }
954     }
955 #if CMK_SMP
956     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
957     {
958 #else
959     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
960     {
961 #endif
962         CMI_GNI_LOCK
963         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
964         CMI_GNI_UNLOCK
965         if(status == GNI_RC_SUCCESS)
966         {
967 #if CMK_SMP_TRACE_COMMTHREAD
968             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
969             { 
970                 START_EVENT();
971                 if ( tag == SMALL_DATA_TAG)
972                     real_data = (char*)msg; 
973                 else 
974                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
975                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
976             }
977 #endif
978             smsg_send_count ++;
979             return status;
980         }else
981             status = GNI_RC_ERROR_RESOURCE;
982     }
983     if(inbuff ==0)
984         buffer_small_msgs(queue, msg, size, destNode, tag);
985     return status;
986 }
987
988
989 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
990 {
991     /* construct a control message and send */
992     CONTROL_MSG         *control_msg_tmp;
993     MallocControlMsg(control_msg_tmp);
994     control_msg_tmp->source_addr = (uint64_t)msg;
995     control_msg_tmp->seq_id    = seqno;
996     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
997 #if     USE_LRTS_MEMPOOL
998     if(size < BIG_MSG)
999     {
1000         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1001     }
1002     else
1003     {
1004         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1005         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1006         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1007     }
1008 #else
1009     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1010 #endif
1011     return control_msg_tmp;
1012 }
1013
1014 // Large message, send control to receiver, receiver register memory and do a GET, 
1015 // return 1 - send no success
1016 inline
1017 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
1018 {
1019     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1020     uint32_t            vmdh_index  = -1;
1021     int                 size;
1022     int                 offset = 0;
1023     uint64_t            source_addr;
1024     int                 register_size; 
1025     void                *msg;
1026
1027     size    =   control_msg_tmp->total_length;
1028     source_addr = control_msg_tmp->source_addr;
1029     register_size = control_msg_tmp->length;
1030
1031 #if     USE_LRTS_MEMPOOL
1032     if( control_msg_tmp->seq_id == 0 ){
1033         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1034         {
1035             msg = (void*)source_addr;
1036             //register the corresponding mempool
1037             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1038             {
1039                 if(!inbuff)
1040                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1041                 return GNI_RC_ERROR_NOMEM;
1042             }
1043             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1044             if(status == GNI_RC_SUCCESS)
1045             {
1046                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1047             }
1048         }else
1049         {
1050             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1051             status = GNI_RC_SUCCESS;
1052         }
1053         if(NoMsgInSend( control_msg_tmp->source_addr))
1054             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1055         else
1056             register_size = 0;
1057     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1058     {
1059         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1060         source_addr += offset;
1061         size = control_msg_tmp->length;
1062         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1063             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1064             {
1065                 if(!inbuff)
1066                     buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1067                 return GNI_RC_ERROR_NOMEM;
1068             }
1069             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
1070             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1071         }
1072         else
1073         {
1074             status = GNI_RC_SUCCESS;
1075         }
1076         register_size = 0;  
1077     }
1078
1079     if(status == GNI_RC_SUCCESS)
1080     {
1081         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1082         if(status == GNI_RC_SUCCESS)
1083         {
1084             buffered_send_msg += register_size;
1085             if(control_msg_tmp->seq_id == 0)
1086             {
1087                 IncreaseMsgInSend(source_addr);
1088             }
1089             FreeControlMsg(control_msg_tmp);
1090             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
1091         }else
1092             status = GNI_RC_ERROR_RESOURCE;
1093
1094     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1095     {
1096         CmiAbort("Memory registor for large msg\n");
1097     }else 
1098     {
1099         status = GNI_RC_ERROR_NOMEM; 
1100         if(!inbuff)
1101             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1102     }
1103     return status;
1104 #else
1105     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
1106     if(status == GNI_RC_SUCCESS)
1107     {
1108         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1109         if(status == GNI_RC_SUCCESS)
1110         {
1111             FreeControlMsg(control_msg_tmp);
1112         }
1113     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1114     {
1115         CmiAbort("Memory registor for large msg\n");
1116     }else 
1117     {
1118         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1119     }
1120     return status;
1121 #endif
1122 }
1123
1124 inline void LrtsPrepareEnvelope(char *msg, int size)
1125 {
1126     CmiSetMsgSize(msg, size);
1127 }
1128
1129 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1130 {
1131     gni_return_t        status  =   GNI_RC_SUCCESS;
1132     uint8_t tag;
1133     CONTROL_MSG         *control_msg_tmp;
1134     int                 oob = ( mode & OUT_OF_BAND);
1135     SMSG_QUEUE          *queue;
1136
1137     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1138 #if CMK_USE_OOB
1139     queue = oob? &smsg_oob_queue : &smsg_queue;
1140 #else
1141     queue = &smsg_queue;
1142 #endif
1143
1144     LrtsPrepareEnvelope(msg, size);
1145
1146 #if PRINT_SYH
1147     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1148 #endif 
1149 #if CMK_SMP && COMM_THREAD_SEND
1150     if(size <= SMSG_MAX_MSG)
1151         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1152     else if (size < BIG_MSG) {
1153         control_msg_tmp =  construct_control_msg(size, msg, 0);
1154         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1155     }
1156     else {
1157           CmiSetMsgSeq(msg, 0);
1158           control_msg_tmp =  construct_control_msg(size, msg, 1);
1159           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1160     }
1161 #else   //non-smp, smp(worker sending)
1162     if(size <= SMSG_MAX_MSG)
1163     {
1164         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1165             CmiFree(msg);
1166     }
1167     else if (size < BIG_MSG) {
1168         control_msg_tmp =  construct_control_msg(size, msg, 0);
1169         send_large_messages(queue, destNode, control_msg_tmp, 0);
1170     }
1171     else {
1172 #if     USE_LRTS_MEMPOOL
1173         CmiSetMsgSeq(msg, 0);
1174         control_msg_tmp =  construct_control_msg(size, msg, 1);
1175         send_large_messages(queue, destNode, control_msg_tmp, 0);
1176 #else
1177         control_msg_tmp =  construct_control_msg(size, msg, 0);
1178         send_large_messages(queue, destNode, control_msg_tmp, 0);
1179 #endif
1180     }
1181 #endif
1182     return 0;
1183 }
1184
1185 static void    PumpDatagramConnection();
1186 static void registerUserTraceEvents() {
1187 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1188     traceRegisterUserEvent("setting up connections", 10);
1189     traceRegisterUserEvent("Receiving small msgs", 20);
1190     traceRegisterUserEvent("Release local transaction", 30);
1191     traceRegisterUserEvent("Sending buffered small msgs", 40);
1192     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1193 #endif
1194 }
1195
1196 static void ProcessDeadlock()
1197 {
1198     static CmiUInt8 *ptr = NULL;
1199     static CmiUInt8  last = 0, mysum, sum;
1200     static int count = 0;
1201     gni_return_t status;
1202     int i;
1203
1204 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1205 //sweep_mempool(CpvAccess(mempool));
1206     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1207     mysum = smsg_send_count + smsg_recv_count;
1208     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1209     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1210     GNI_RC_CHECK("PMI_Allgather", status);
1211     sum = 0;
1212     for (i=0; i<mysize; i++)  sum+= ptr[i];
1213     if (last == 0 || sum == last) 
1214         count++;
1215     else
1216         count = 0;
1217     last = sum;
1218     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1219     if (count == 2) { 
1220         /* detected twice, it is a real deadlock */
1221         if (myrank == 0)  {
1222             CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage.\n");
1223             CmiAbort("Fatal> Deadlock detected.");
1224         }
1225     }
1226     _detected_hang = 0;
1227 }
1228
1229 static void CheckProgress()
1230 {
1231     if (smsg_send_count == last_smsg_send_count &&
1232         smsg_recv_count == last_smsg_recv_count ) 
1233     {
1234         _detected_hang = 1;
1235 #if !CMK_SMP
1236         if (_detected_hang) ProcessDeadlock();
1237 #endif
1238
1239     }
1240     else {
1241         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1242         last_smsg_send_count = smsg_send_count;
1243         last_smsg_recv_count = smsg_recv_count;
1244         _detected_hang = 0;
1245     }
1246 }
1247
1248 void LrtsPostCommonInit(int everReturn)
1249 {
1250 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1251     CpvInitialize(double, projTraceStart);
1252     /* only PE 0 needs to care about registration (to generate sts file). */
1253     if (CmiMyPe() == 0) {
1254         registerMachineUserEventsFunction(&registerUserTraceEvents);
1255     }
1256 #endif
1257
1258 #if CMK_SMP
1259     CmiIdleState *s=CmiNotifyGetState();
1260     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1261     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1262 #else
1263     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1264     if (useDynamicSMSG)
1265     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1266 #endif
1267
1268     if (_checkProgress)
1269 #if CMK_SMP
1270     if (CmiMyRank() == 0)
1271 #endif
1272     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1273 }
1274
1275 /* this is called by worker thread */
1276 void LrtsPostNonLocal(){
1277 #if CMK_SMP
1278 #if MULTI_THREAD_SEND
1279     if(mysize == 1) return;
1280     PumpLocalRdmaTransactions();
1281 #if CMK_USE_OOB
1282     if (SendBufferMsg(&smsg_oob_queue) == 1)
1283 #endif
1284     SendBufferMsg(&smsg_queue);
1285     SendRdmaMsg();
1286 #endif
1287 #endif
1288 }
1289
1290 /* useDynamicSMSG */
1291 static void    PumpDatagramConnection()
1292 {
1293     uint32_t          remote_address;
1294     uint32_t          remote_id;
1295     gni_return_t status;
1296     gni_post_state_t  post_state;
1297     uint64_t          datagram_id;
1298     int i;
1299
1300    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1301    {
1302        if (datagram_id >= mysize) {           /* bound endpoint */
1303            int pe = datagram_id - mysize;
1304            CMI_GNI_LOCK
1305            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1306            CMI_GNI_UNLOCK
1307            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1308            {
1309                CmiAssert(remote_id == pe);
1310                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1311                GNI_RC_CHECK("Dynamic SMSG Init", status);
1312 #if PRINT_SYH
1313                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1314 #endif
1315                CmiAssert(smsg_connected_flag[pe] == 1);
1316                smsg_connected_flag[pe] = 2;
1317            }
1318        }
1319        else {         /* unbound ep */
1320            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1321            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1322            {
1323                CmiAssert(remote_id<mysize);
1324                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1325                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1326                GNI_RC_CHECK("Dynamic SMSG Init", status);
1327 #if PRINT_SYH
1328                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1329 #endif
1330                smsg_connected_flag[remote_id] = 2;
1331
1332                alloc_smsg_attr(&send_smsg_attr);
1333                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1334                GNI_RC_CHECK("post unbound datagram", status);
1335            }
1336        }
1337    }
1338 }
1339
1340 /* pooling CQ to receive network message */
1341 static void PumpNetworkRdmaMsgs()
1342 {
1343     gni_cq_entry_t      event_data;
1344     gni_return_t        status;
1345
1346 }
1347
1348 inline 
1349 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1350 {
1351     RDMA_REQUEST        *rdma_request_msg;
1352     MallocRdmaRequest(rdma_request_msg);
1353     rdma_request_msg->destNode = inst_id;
1354     rdma_request_msg->pd = pd;
1355 #if CMK_SMP
1356     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1357 #else
1358     if(sendRdmaBuf == 0)
1359     {
1360         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1361     }else{
1362         sendRdmaTail->next = rdma_request_msg;
1363         sendRdmaTail =  rdma_request_msg;
1364     }
1365 #endif
1366
1367 }
1368 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1369 static void PumpNetworkSmsg()
1370 {
1371     uint64_t            inst_id;
1372     int                 ret;
1373     gni_cq_entry_t      event_data;
1374     gni_return_t        status, status2;
1375     void                *header;
1376     uint8_t             msg_tag;
1377     int                 msg_nbytes;
1378     void                *msg_data;
1379     gni_mem_handle_t    msg_mem_hndl;
1380     gni_smsg_attr_t     *smsg_attr;
1381     gni_smsg_attr_t     *remote_smsg_attr;
1382     int                 init_flag;
1383     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1384     uint64_t            source_addr;
1385     SMSG_QUEUE         *queue = &smsg_queue;
1386
1387     while(1)
1388     {
1389         CMI_GNI_LOCK
1390         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1391         CMI_GNI_UNLOCK
1392         if(status != GNI_RC_SUCCESS)
1393             break;
1394         inst_id = GNI_CQ_GET_INST_ID(event_data);
1395         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1396 #if PRINT_SYH
1397         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
1398 #endif
1399         if (useDynamicSMSG) {
1400             /* subtle: smsg may come before connection is setup */
1401             while (smsg_connected_flag[inst_id] != 2) 
1402                PumpDatagramConnection();
1403         }
1404         msg_tag = GNI_SMSG_ANY_TAG;
1405         while(1) {
1406             CMI_GNI_LOCK
1407             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1408             CMI_GNI_UNLOCK
1409             if (status != GNI_RC_SUCCESS)
1410                 break;
1411 #if PRINT_SYH
1412             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1413 #endif
1414             /* copy msg out and then put into queue (small message) */
1415             switch (msg_tag) {
1416             case SMALL_DATA_TAG:
1417             {
1418                 START_EVENT();
1419                 msg_nbytes = CmiGetMsgSize(header);
1420                 msg_data    = CmiAlloc(msg_nbytes);
1421                 memcpy(msg_data, (char*)header, msg_nbytes);
1422 #if CMK_SMP_TRACE_COMMTHREAD
1423                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1424 #endif
1425                 handleOneRecvedMsg(msg_nbytes, msg_data);
1426                 break;
1427             }
1428             case LMSG_INIT_TAG:
1429             {
1430                 getLargeMsgRequest(header, inst_id);
1431                 break;
1432             }
1433             case ACK_TAG:   //msg fit into mempool
1434             {
1435                 /* Get is done, release message . Now put is not used yet*/
1436                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1437 #if ! USE_LRTS_MEMPOOL
1438                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh, header_tmp->length);
1439 #else
1440                 DecreaseMsgInSend(msg);
1441 #endif
1442                 if(NoMsgInSend(msg))
1443                     buffered_send_msg -= GetMempoolsize(msg);
1444                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1445                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1446                 break;
1447             }
1448             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1449             {
1450                 header_tmp = (CONTROL_MSG *) header;
1451                 void *msg = (void*)(header_tmp->source_addr);
1452                 int cur_seq = CmiGetMsgSeq(msg);
1453                 int offset = ONE_SEG*(cur_seq+1);
1454                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1455                 buffered_send_msg -= header_tmp->length;
1456                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1457                 if (remain_size < 0) remain_size = 0;
1458                 CmiSetMsgSize(msg, remain_size);
1459                 if(remain_size <= 0) //transaction done
1460                 {
1461                     CmiFree(msg);
1462                 }else if (header_tmp->total_length > offset)
1463                 {
1464                     CmiSetMsgSeq(msg, cur_seq+1);
1465                     control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1466                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
1467                     //send next seg
1468                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1469                          // pipelining
1470                     if (header_tmp->seq_id == 1) {
1471                       int i;
1472                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1473                         int seq = cur_seq+i+2;
1474                         CmiSetMsgSeq(msg, seq-1);
1475                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1476                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1477                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1478                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1479                       }
1480                     }
1481                 }
1482                 break;
1483             }
1484 #if CMK_PERSISTENT_COMM
1485             case PUT_DONE_TAG: //persistent message
1486                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1487                 int size = ((CONTROL_MSG *) header)->length;
1488                 CmiReference(msg);
1489                 handleOneRecvedMsg(size, msg); 
1490                 break;
1491 #endif
1492 #ifdef CMK_DIRECT
1493             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1494                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1495                 break;
1496 #endif
1497             default: {
1498                 printf("weird tag problem\n");
1499                 CmiAbort("Unknown tag\n");
1500                      }
1501             }               // end switch
1502 #if PRINT_SYH
1503             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1504 #endif
1505             CMI_GNI_LOCK
1506             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1507             CMI_GNI_UNLOCK
1508             smsg_recv_count ++;
1509             msg_tag = GNI_SMSG_ANY_TAG;
1510         } //endwhile getNext
1511     }   //end while GetEvent
1512     if(status == GNI_RC_ERROR_RESOURCE)
1513     {
1514         GNI_RC_CHECK("Smsg_rx_cq full", status);
1515     }
1516 }
1517
1518 static void printDesc(gni_post_descriptor_t *pd)
1519 {
1520     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1521 }
1522
1523 // for BIG_MSG called on receiver side for receiving control message
1524 // LMSG_INIT_TAG
1525 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1526 {
1527 #if     USE_LRTS_MEMPOOL
1528     CONTROL_MSG         *request_msg;
1529     gni_return_t        status = GNI_RC_SUCCESS;
1530     void                *msg_data;
1531     gni_post_descriptor_t *pd;
1532     gni_mem_handle_t    msg_mem_hndl;
1533     int source, size, transaction_size, offset = 0;
1534     int     register_size = 0;
1535
1536     // initial a get to transfer data from the sender side */
1537     request_msg = (CONTROL_MSG *) header;
1538     size = request_msg->total_length;
1539     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1540     if(request_msg->seq_id < 2)   {
1541         msg_data = CmiAlloc(size);
1542         CmiSetMsgSeq(msg_data, 0);
1543         _MEMCHECK(msg_data);
1544     }
1545     else {
1546         offset = ONE_SEG*(request_msg->seq_id-1);
1547         msg_data = (char*)request_msg->dest_addr + offset;
1548     }
1549    
1550     MallocPostDesc(pd);
1551     pd->cqwrite_value = request_msg->seq_id;
1552     if( request_msg->seq_id == 0)
1553     {
1554         pd->local_mem_hndl= GetMemHndl(msg_data);
1555         transaction_size = ALIGN64(size);
1556         if(IsMemHndlZero(pd->local_mem_hndl))
1557         {   
1558             status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
1559             if(status == GNI_RC_SUCCESS)
1560             {
1561                 pd->local_mem_hndl = GetMemHndl(msg_data);
1562             }
1563             else
1564             {
1565                 SetMemHndlZero(pd->local_mem_hndl);
1566             }
1567         }
1568         if(NoMsgInRecv( (void*)(msg_data)))
1569             register_size = GetMempoolsize((void*)(msg_data));
1570         else
1571             register_size = 0;
1572     }
1573     else{
1574         transaction_size = ALIGN64(request_msg->length);
1575         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
1576         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1577         {
1578             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1579         }
1580     }
1581     pd->first_operand = ALIGN64(size);                   //  total length
1582
1583     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1584         pd->type            = GNI_POST_FMA_GET;
1585     else
1586         pd->type            = GNI_POST_RDMA_GET;
1587 #if REMOTE_EVENT
1588     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1589 #else
1590     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1591 #endif
1592     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1593     pd->length          = transaction_size;
1594     pd->local_addr      = (uint64_t) msg_data;
1595     pd->remote_addr     = request_msg->source_addr + offset;
1596     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1597     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1598     pd->rdma_mode       = 0;
1599     pd->amo_cmd         = 0;
1600
1601     //memory registration success
1602     if(status == GNI_RC_SUCCESS)
1603     {
1604         CMI_GNI_LOCK
1605         if(pd->type == GNI_POST_RDMA_GET) 
1606             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1607         else
1608             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1609         CMI_GNI_UNLOCK
1610
1611         if(status == GNI_RC_SUCCESS )
1612         {
1613             if(pd->cqwrite_value == 0)
1614             {
1615 #if MACHINE_DEBUG_LOG
1616                 buffered_recv_msg += register_size;
1617                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1618 #endif
1619                 IncreaseMsgInRecv(msg_data);
1620             }
1621         }
1622     }else
1623     {
1624         SetMemHndlZero((pd->local_mem_hndl));
1625     }
1626     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1627     {
1628         bufferRdmaMsg(inst_id, pd); 
1629     }else {
1630          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
1631         GNI_RC_CHECK("GetLargeAFter posting", status);
1632     }
1633 #else
1634     CONTROL_MSG         *request_msg;
1635     gni_return_t        status;
1636     void                *msg_data;
1637     gni_post_descriptor_t *pd;
1638     RDMA_REQUEST        *rdma_request_msg;
1639     gni_mem_handle_t    msg_mem_hndl;
1640     //int source;
1641     // initial a get to transfer data from the sender side */
1642     request_msg = (CONTROL_MSG *) header;
1643     msg_data = CmiAlloc(request_msg->length);
1644     _MEMCHECK(msg_data);
1645
1646     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, status)
1647
1648     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1649     {
1650         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1651     }
1652
1653     MallocPostDesc(pd);
1654     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1655         pd->type            = GNI_POST_FMA_GET;
1656     else
1657         pd->type            = GNI_POST_RDMA_GET;
1658 #if REMOTE_EVENT
1659     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1660 #else
1661     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1662 #endif
1663     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1664     pd->length          = ALIGN64(request_msg->length);
1665     pd->local_addr      = (uint64_t) msg_data;
1666     pd->remote_addr     = request_msg->source_addr;
1667     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1668     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1669     pd->rdma_mode       = 0;
1670     pd->amo_cmd         = 0;
1671
1672     //memory registration successful
1673     if(status == GNI_RC_SUCCESS)
1674     {
1675         pd->local_mem_hndl  = msg_mem_hndl;
1676         CMI_GNI_LOCK
1677         if(pd->type == GNI_POST_RDMA_GET) 
1678             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1679         else
1680             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1681         CMI_GNI_UNLOCK
1682     }else
1683     {
1684         SetMemHndlZero(pd->local_mem_hndl);
1685     }
1686     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1687     {
1688         MallocRdmaRequest(rdma_request_msg);
1689         rdma_request_msg->next = 0;
1690         rdma_request_msg->destNode = inst_id;
1691         rdma_request_msg->pd = pd;
1692         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1693     }else {
1694         GNI_RC_CHECK("AFter posting", status);
1695     }
1696 #endif
1697 }
1698
1699 static void PumpLocalRdmaTransactions()
1700 {
1701     gni_cq_entry_t          ev;
1702     gni_return_t            status;
1703     uint64_t                type, inst_id;
1704     gni_post_descriptor_t   *tmp_pd;
1705     MSG_LIST                *ptr;
1706     CONTROL_MSG             *ack_msg_tmp;
1707     uint8_t                 msg_tag;
1708 #ifdef CMK_DIRECT
1709     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1710 #endif
1711     SMSG_QUEUE         *queue = &smsg_queue;
1712
1713     while(1) {
1714         CMI_GNI_LOCK 
1715         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1716         CMI_GNI_UNLOCK
1717         if(status != GNI_RC_SUCCESS) break;
1718         
1719         type = GNI_CQ_GET_TYPE(ev);
1720         if (type == GNI_CQ_EVENT_TYPE_POST)
1721         {
1722             inst_id     = GNI_CQ_GET_INST_ID(ev);
1723 #if PRINT_SYH
1724             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1725 #endif
1726             CMI_GNI_LOCK
1727             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1728             CMI_GNI_UNLOCK
1729 #ifdef CMK_DIRECT
1730             if(tmp_pd->amo_cmd == 1)
1731             {
1732                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1733                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1734                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1735                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1736             }
1737             else{
1738                 MallocControlMsg(ack_msg_tmp);
1739                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1740                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1741             }
1742 #else
1743             MallocControlMsg(ack_msg_tmp);
1744             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1745             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1746 #endif
1747             ////Message is sent, free message , put is not used now
1748             switch (tmp_pd->type) {
1749 #if CMK_PERSISTENT_COMM
1750             case GNI_POST_RDMA_PUT:
1751 #if ! USE_LRTS_MEMPOOL
1752                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1753 #endif
1754             case GNI_POST_FMA_PUT:
1755                 CmiFree((void *)tmp_pd->local_addr);
1756                 msg_tag = PUT_DONE_TAG;
1757                 break;
1758 #endif
1759 #ifdef CMK_DIRECT
1760             case GNI_POST_RDMA_PUT:
1761             case GNI_POST_FMA_PUT:
1762                 //sender ACK to receiver to trigger it is done
1763                 msg_tag = DIRECT_PUT_DONE_TAG;
1764                 break;
1765 #endif
1766             case GNI_POST_RDMA_GET:
1767             case GNI_POST_FMA_GET:
1768 #if  ! USE_LRTS_MEMPOOL
1769                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
1770                 msg_tag = ACK_TAG;  
1771 #else
1772                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1773                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1774                 {
1775                     msg_tag = BIG_MSG_TAG; 
1776                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
1777                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1778                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1779                 } 
1780                 else
1781                 {
1782                     msg_tag = ACK_TAG;  
1783                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1784                 }
1785                 ack_msg_tmp->length = tmp_pd->length;
1786                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1787 #endif
1788                 break;
1789             default:
1790                 CmiPrintf("type=%d\n", tmp_pd->type);
1791                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1792             }
1793 #if CMK_DIRECT
1794             if(tmp_pd->amo_cmd == 1)
1795                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1796             else
1797 #endif
1798                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1799             if(status == GNI_RC_SUCCESS)
1800             {
1801 #if CMK_DIRECT
1802                 if(tmp_pd->amo_cmd == 1)
1803                     free(cmk_direct_done_msg); 
1804                 else
1805 #endif
1806                     FreeControlMsg(ack_msg_tmp);
1807
1808             }
1809 #if CMK_PERSISTENT_COMM
1810             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1811 #endif
1812             {
1813                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1814 #if PRINT_SYH
1815                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1816 #endif
1817 #if CMK_SMP_TRACE_COMMTHREAD
1818                     START_EVENT();
1819 #endif
1820                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1821                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1822 #if MACHINE_DEBUG_LOG
1823                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1824                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1825                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
1826 #endif
1827 #if CMK_SMP_TRACE_COMMTHREAD
1828                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1829 #endif
1830                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1831                 }else if(msg_tag == BIG_MSG_TAG){
1832                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1833                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1834                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1835 #if CMK_SMP_TRACE_COMMTHREAD
1836                         START_EVENT();
1837 #endif
1838 #if PRINT_SYH
1839                         printf("Pipeline msg done [%d]\n", myrank);
1840 #endif
1841 #if CMK_SMP_TRACE_COMMTHREAD
1842                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1843 #endif
1844                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1845                     }
1846                 }
1847             }
1848             FreePostDesc(tmp_pd);
1849         }
1850     } //end while
1851     if(status == GNI_RC_ERROR_RESOURCE)
1852     {
1853         GNI_RC_CHECK("Smsg_tx_cq full", status);
1854     }
1855 }
1856
1857 static void  SendRdmaMsg()
1858 {
1859     gni_return_t            status = GNI_RC_SUCCESS;
1860     gni_mem_handle_t        msg_mem_hndl;
1861     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
1862     RDMA_REQUEST            *pre = 0;
1863     int i, register_size = 0;
1864     void                    *msg;
1865 #if CMK_SMP
1866     int len = PCQueueLength(sendRdmaBuf);
1867     for (i=0; i<len; i++)
1868     {
1869         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1870         if (ptr == NULL) break;
1871 #else
1872     ptr = sendRdmaBuf;
1873     while (ptr!=0)
1874     {
1875 #endif 
1876         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1877         gni_post_descriptor_t *pd = ptr->pd;
1878         status = GNI_RC_SUCCESS;
1879         
1880         if(pd->cqwrite_value == 0)
1881         {
1882             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1883             {
1884                 msg = (void*)(pd->local_addr);
1885                 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
1886                 if(status == GNI_RC_SUCCESS)
1887                 {
1888                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1889                 }
1890             }else
1891             {
1892                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1893             }
1894             if(NoMsgInRecv( (void*)(pd->local_addr)))
1895                 register_size = GetMempoolsize((void*)(pd->local_addr));
1896             else
1897                 register_size = 0;
1898         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1899         {
1900             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
1901         }
1902         if(status == GNI_RC_SUCCESS)        //mem register good
1903         {
1904             CMI_GNI_LOCK
1905             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1906                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1907             else
1908                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1909             CMI_GNI_UNLOCK
1910             if(status == GNI_RC_SUCCESS)    //post good
1911             {
1912 #if !CMK_SMP
1913                 tmp_ptr = ptr;
1914                 if(pre != 0) {
1915                     pre->next = ptr->next;
1916                 }
1917                 else {
1918                     sendRdmaBuf = ptr->next;
1919                 }
1920                 ptr = ptr->next;
1921                 FreeRdmaRequest(tmp_ptr);
1922 #endif
1923                 if(pd->cqwrite_value == 0)
1924                 {
1925                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1926                 }
1927 #if MACHINE_DEBUG_LOG
1928                 buffered_recv_msg += register_size;
1929                 MACHSTATE(8, "GO request from buffered\n"); 
1930 #endif
1931             }else           // cannot post
1932             {
1933 #if CMK_SMP
1934                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1935 #else
1936                 pre = ptr;
1937                 ptr = ptr->next;
1938 #endif
1939                 break;
1940             }
1941         } else          //memory registration fails
1942         {
1943 #if CMK_SMP
1944             PCQueuePush(sendRdmaBuf, (char*)ptr);
1945 #else
1946             pre = ptr;
1947             ptr = ptr->next;
1948 #endif
1949         }
1950     } //end while
1951 #if ! CMK_SMP
1952     if(ptr == 0)
1953         sendRdmaTail = pre;
1954 #endif
1955 }
1956
1957 // return 1 if all messages are sent
1958 static int SendBufferMsg(SMSG_QUEUE *queue)
1959 {
1960     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1961     CONTROL_MSG         *control_msg_tmp;
1962     gni_return_t        status;
1963     int                 done = 1;
1964     int                 register_size;
1965     void                *register_addr;
1966     int                 index_previous = -1;
1967     int                 index = queue->smsg_head_index;
1968 #if CMI_EXERT_SEND_CAP
1969     int                 sent_cnt = 0;
1970 #endif
1971
1972 #if !CMK_SMP
1973     index = queue->smsg_head_index;
1974 #else
1975     index = 0;
1976 #endif
1977 #if CMK_SMP
1978     while(index <mysize)
1979     {
1980         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1981         for (i=0; i<len; i++) 
1982         {
1983             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
1984             if (ptr == 0) break;
1985 #else
1986     while(index != -1)
1987     {
1988         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
1989         pre = 0;
1990         while(ptr != 0)
1991         {
1992 #endif
1993             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
1994             status = GNI_RC_ERROR_RESOURCE;
1995             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
1996                 /* connection not exists yet */
1997             }
1998             else
1999             switch(ptr->tag)
2000             {
2001             case SMALL_DATA_TAG:
2002                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
2003                 if(status == GNI_RC_SUCCESS)
2004                 {
2005                     CmiFree(ptr->msg);
2006                 }
2007                 break;
2008             case LMSG_INIT_TAG:
2009                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2010                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2011                 break;
2012             case   ACK_TAG:
2013             case   BIG_MSG_TAG:
2014                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
2015                 if(status == GNI_RC_SUCCESS)
2016                 {
2017                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
2018                 }
2019                 break;
2020 #ifdef CMK_DIRECT
2021             case DIRECT_PUT_DONE_TAG:
2022                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
2023                 if(status == GNI_RC_SUCCESS)
2024                 {
2025                     free((CMK_DIRECT_HEADER*)ptr->msg);
2026                 }
2027                 break;
2028
2029 #endif
2030             default:
2031                 printf("Weird tag\n");
2032                 CmiAbort("should not happen\n");
2033             }       // end switch
2034             if(status == GNI_RC_SUCCESS)
2035             {
2036 #if !CMK_SMP
2037                 tmp_ptr = ptr;
2038                 if(pre)
2039                 {
2040                     ptr = pre ->next = ptr->next;
2041                 }else
2042                 {
2043                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2044                 }
2045                 FreeMsgList(tmp_ptr);
2046 #else
2047                 FreeMsgList(ptr);
2048 #endif
2049 #if PRINT_SYH
2050                 buffered_smsg_counter--;
2051                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2052 #endif
2053 #if CMI_EXERT_SEND_CAP
2054                 sent_cnt++;
2055                 if(sent_cnt == SEND_CAP)
2056                     break;
2057 #endif
2058             }else {
2059 #if CMK_SMP
2060                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2061 #else
2062                 pre = ptr;
2063                 ptr=ptr->next;
2064 #endif
2065                 done = 0;
2066                 if(status == GNI_RC_ERROR_RESOURCE)
2067                     break;
2068             } 
2069         } //end while
2070 #if !CMK_SMP
2071         if(ptr == 0)
2072             queue->smsg_msglist_index[index].tail = pre;
2073         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2074         {
2075             if(index_previous != -1)
2076                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2077             else
2078                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2079         }else
2080         {index_previous = index;
2081         }
2082         index = queue->smsg_msglist_index[index].next;
2083 #else
2084         index++;
2085 #endif
2086
2087 #if CMI_EXERT_SEND_CAP
2088         if(sent_cnt == SEND_CAP)
2089                 break;
2090 #endif
2091     }   // end pooling for all cores
2092     return done;
2093 }
2094
2095 static void ProcessDeadlock();
2096 void LrtsAdvanceCommunication(int whileidle)
2097 {
2098     /*  Receive Msg first */
2099 #if CMK_SMP_TRACE_COMMTHREAD
2100     double startT, endT;
2101 #endif
2102     if (useDynamicSMSG && whileidle)
2103     {
2104 #if CMK_SMP_TRACE_COMMTHREAD
2105         startT = CmiWallTimer();
2106 #endif
2107         PumpDatagramConnection();
2108 #if CMK_SMP_TRACE_COMMTHREAD
2109         endT = CmiWallTimer();
2110         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2111 #endif
2112     }
2113
2114 #if CMK_SMP_TRACE_COMMTHREAD
2115     startT = CmiWallTimer();
2116 #endif
2117     PumpNetworkSmsg();
2118     //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2119 #if CMK_SMP_TRACE_COMMTHREAD
2120     endT = CmiWallTimer();
2121     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2122 #endif
2123
2124 #if CMK_SMP_TRACE_COMMTHREAD
2125     startT = CmiWallTimer();
2126 #endif
2127     PumpLocalRdmaTransactions();
2128     //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2129 #if CMK_SMP_TRACE_COMMTHREAD
2130     endT = CmiWallTimer();
2131     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2132 #endif
2133     /* Send buffered Message */
2134 #if CMK_SMP_TRACE_COMMTHREAD
2135     startT = CmiWallTimer();
2136 #endif
2137 #if CMK_USE_OOB
2138     if (SendBufferMsg(&smsg_oob_queue) == 1)
2139 #endif
2140     SendBufferMsg(&smsg_queue);
2141     //MACHSTATE(8, "after SendBufferMsg\n") ; 
2142 #if CMK_SMP_TRACE_COMMTHREAD
2143     endT = CmiWallTimer();
2144     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2145 #endif
2146
2147 #if CMK_SMP_TRACE_COMMTHREAD
2148     startT = CmiWallTimer();
2149 #endif
2150     SendRdmaMsg();
2151     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
2152 #if CMK_SMP_TRACE_COMMTHREAD
2153     endT = CmiWallTimer();
2154     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2155 #endif
2156
2157 #if CMK_SMP
2158     if (_detected_hang)  ProcessDeadlock();
2159 #endif
2160 }
2161
2162 /* useDynamicSMSG */
2163 static void _init_dynamic_smsg()
2164 {
2165     gni_return_t status;
2166     uint32_t     vmdh_index = -1;
2167     int i;
2168
2169     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2170     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2171     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2172     for(i=0; i<mysize; i++) {
2173         smsg_connected_flag[i] = 0;
2174         smsg_attr_vector_local[i] = NULL;
2175         smsg_attr_vector_remote[i] = NULL;
2176     }
2177     if(mysize <=512)
2178     {
2179         SMSG_MAX_MSG = 4096;
2180     }else if (mysize <= 4096)
2181     {
2182         SMSG_MAX_MSG = 4096/mysize * 1024;
2183     }else if (mysize <= 16384)
2184     {
2185         SMSG_MAX_MSG = 512;
2186     }else {
2187         SMSG_MAX_MSG = 256;
2188     }
2189
2190     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2191     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2192     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2193     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2194     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2195
2196     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2197     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2198     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2199     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2200     mailbox_list->offset = 0;
2201     mailbox_list->next = 0;
2202     
2203     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2204         mailbox_list->size, smsg_rx_cqh,
2205         GNI_MEM_READWRITE,   
2206         vmdh_index,
2207         &(mailbox_list->mem_hndl));
2208     GNI_RC_CHECK("MEMORY registration for smsg", status);
2209
2210     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2211     GNI_RC_CHECK("Unbound EP", status);
2212     
2213     alloc_smsg_attr(&send_smsg_attr);
2214
2215     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2216     GNI_RC_CHECK("post unbound datagram", status);
2217
2218       /* always pre-connect to proc 0 */
2219     //if (myrank != 0) connect_to(0);
2220 }
2221
2222 static void _init_static_smsg()
2223 {
2224     gni_smsg_attr_t      *smsg_attr;
2225     gni_smsg_attr_t      remote_smsg_attr;
2226     gni_smsg_attr_t      *smsg_attr_vec;
2227     gni_mem_handle_t     my_smsg_mdh_mailbox;
2228     int      ret, i;
2229     gni_return_t status;
2230     uint32_t              vmdh_index = -1;
2231     mdh_addr_t            base_infor;
2232     mdh_addr_t            *base_addr_vec;
2233     char *env;
2234
2235     if(mysize <=512)
2236     {
2237         SMSG_MAX_MSG = 1024;
2238     }else if (mysize <= 4096)
2239     {
2240         SMSG_MAX_MSG = 1024;
2241     }else if (mysize <= 16384)
2242     {
2243         SMSG_MAX_MSG = 512;
2244     }else {
2245         SMSG_MAX_MSG = 256;
2246     }
2247     
2248     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2249     if (env) SMSG_MAX_MSG = atoi(env);
2250     CmiAssert(SMSG_MAX_MSG > 0);
2251
2252     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2253     
2254     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2255     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2256     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2257     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2258     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2259     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2260     CmiAssert(ret == 0);
2261     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2262     
2263     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2264             smsg_memlen*(mysize), smsg_rx_cqh,
2265             GNI_MEM_READWRITE,   
2266             vmdh_index,
2267             &my_smsg_mdh_mailbox);
2268     register_memory_size += smsg_memlen*(mysize);
2269     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2270
2271     if (myrank == 0)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2272     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
2273
2274     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2275     base_infor.mdh =  my_smsg_mdh_mailbox;
2276     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2277
2278     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2279  
2280     for(i=0; i<mysize; i++)
2281     {
2282         if(i==myrank)
2283             continue;
2284         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2285         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2286         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2287         smsg_attr[i].mbox_offset = i*smsg_memlen;
2288         smsg_attr[i].buff_size = smsg_memlen;
2289         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2290         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2291     }
2292
2293     for(i=0; i<mysize; i++)
2294     {
2295         if (myrank == i) continue;
2296
2297         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2298         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2299         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2300         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2301         remote_smsg_attr.buff_size = smsg_memlen;
2302         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2303         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2304
2305         /* initialize the smsg channel */
2306         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2307         GNI_RC_CHECK("SMSG Init", status);
2308     } //end initialization
2309
2310     free(base_addr_vec);
2311     free(smsg_attr);
2312
2313     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2314     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2315
2316
2317 inline
2318 static void _init_send_queue(SMSG_QUEUE *queue)
2319 {
2320      int i;
2321      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2322      for(i =0; i<mysize; i++)
2323      {
2324         queue->smsg_msglist_index[i].next = -1;
2325 #if CMK_SMP
2326         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2327 #else
2328         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2329 #endif
2330         
2331      }
2332      queue->smsg_head_index = -1;
2333 }
2334
2335 inline
2336 static void _init_smsg()
2337 {
2338     if(mysize > 1) {
2339         if (useDynamicSMSG)
2340             _init_dynamic_smsg();
2341         else
2342             _init_static_smsg();
2343     }
2344
2345     _init_send_queue(&smsg_queue);
2346 #if CMK_USE_OOB
2347     _init_send_queue(&smsg_oob_queue);
2348 #endif
2349 }
2350
2351 static void _init_static_msgq()
2352 {
2353     gni_return_t status;
2354     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2355     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2356     msgq_attrs.smsg_q_sz = 1;
2357     msgq_attrs.rcv_pool_sz = 1;
2358     msgq_attrs.num_msgq_eps = 2;
2359     msgq_attrs.nloc_insts = 8;
2360     msgq_attrs.modes = 0;
2361     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2362
2363     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2364     GNI_RC_CHECK("MSGQ Init", status);
2365
2366
2367 }
2368
2369 #if CMK_SMP && STEAL_MEMPOOL
2370 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2371 {
2372     void *pool = NULL;
2373     int i, k;
2374     // check other ranks
2375     for (k=0; k<CmiMyNodeSize()+1; k++) {
2376         i = (CmiMyRank()+k)%CmiMyNodeSize();
2377         if (i==CmiMyRank()) continue;
2378         mempool_type *mptr = CpvAccessOther(mempool, i);
2379         CmiLock(mptr->mempoolLock);
2380         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2381         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2382             CmiUnlock(mptr->mempoolLock);
2383             continue;
2384         }
2385         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2386         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2387             /* search in the free list */
2388           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2389           mempool_header *current = free_header;
2390           while (current) {
2391             if (current->next_free == (char*)header-(char*)mptr) break;
2392             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2393           }
2394           if (current == NULL) {         /* not found in free list */
2395             CmiUnlock(mptr->mempoolLock);
2396             continue;
2397           }
2398 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2399             /* search the previous memblock, and remove the tail */
2400           mempool_block *ptr = (mempool_block *)mptr;
2401           while (ptr) {
2402             if (ptr->memblock_next == mptr->memblock_tail) break;
2403             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2404           }
2405           CmiAssert(ptr!=NULL);
2406           ptr->memblock_next = 0;
2407           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2408
2409             /* remove memblock from the free list */
2410           current->next_free = header->next_free;
2411           if (header == free_header) mptr->freelist_head = header->next_free;
2412
2413           CmiUnlock(mptr->mempoolLock);
2414
2415           pool = (void*)tail;
2416           *mem_hndl = tail->mem_hndl;
2417           *size = tail->size;
2418           return pool;
2419         }
2420         CmiUnlock(mptr->mempoolLock);
2421     }
2422
2423       /* steal failed, deregister and free memblock now */
2424     int freed = 0;
2425     for (k=0; k<CmiMyNodeSize()+1; k++) {
2426         i = (CmiMyRank()+k)%CmiMyNodeSize();
2427         mempool_type *mptr = CpvAccessOther(mempool, i);
2428         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2429
2430         mempool_block *mempools_head = &(mptr->mempools_head);
2431         mempool_block *current = mempools_head;
2432         mempool_block *prev = NULL;
2433
2434         while (current) {
2435           int isfree = 0;
2436           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2437 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2438           mempool_header *cur = free_header;
2439           mempool_header *header;
2440           if (current != mempools_head) {
2441             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2442              /* search in free list */
2443             if (header->size == current->size - sizeof(mempool_block)) {
2444               cur = free_header;
2445               while (cur) {
2446                 if (cur->next_free == (char*)header-(char*)mptr) break;
2447                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2448               }
2449               if (cur != NULL) isfree = 1;
2450             }
2451           }
2452           if (isfree) {
2453               /* remove from free list */
2454             cur->next_free = header->next_free;
2455             if (header == free_header) mptr->freelist_head = header->next_free;
2456              // deregister
2457             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh,0)
2458             GNI_RC_CHECK("steal Mempool de-register", status);
2459             mempool_block *ptr = current;
2460             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2461             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2462 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2463             freed += ptr->size;
2464             free(ptr);
2465              // try now
2466             if (freed > *size) {
2467               if (pool == NULL) {
2468                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2469                 CmiAssert(ret == 0);
2470               }
2471               MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh, status)
2472               if (status == GNI_RC_SUCCESS) {
2473                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2474 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2475                 return pool;
2476               }
2477 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2478             }
2479           }
2480           else {
2481              prev = current;
2482              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2483           }
2484         }
2485
2486         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2487     }
2488       /* still no luck registering pool */
2489     if (pool) free(pool);
2490     return NULL;
2491 }
2492 #endif
2493
2494 static long long int total_mempool_size = 0;
2495 static long long int total_mempool_calls = 0;
2496
2497 #if USE_LRTS_MEMPOOL
2498 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2499 {
2500     void *pool;
2501     int ret;
2502
2503     int default_size =  expand_flag? _expand_mem : _mempool_size;
2504     if (*size < default_size) *size = default_size;
2505     total_mempool_size += *size;
2506     total_mempool_calls += 1;
2507     if (*size > MAX_REG_MEM) {
2508         printf("Error: A mempool block with size %d is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX.", *size);
2509         CmiAbort("alloc_mempool_block");
2510     }
2511     ret = posix_memalign(&pool, ALIGNBUF, *size);
2512     if (ret != 0) {
2513 #if CMK_SMP && STEAL_MEMPOOL
2514       pool = steal_mempool_block(size, mem_hndl);
2515       if (pool != NULL) return pool;
2516 #endif
2517       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2518       if (ret == ENOMEM)
2519         CmiAbort("alloc_mempool_block: out of memory.");
2520       else
2521         CmiAbort("alloc_mempool_block: posix_memalign failed");
2522     }
2523     SetMemHndlZero((*mem_hndl));
2524     
2525     return pool;
2526 }
2527
2528 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2529 {
2530     if(!(IsMemHndlZero(mem_hndl)))
2531     {
2532         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2533         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2534     }
2535     free(ptr);
2536 }
2537 #endif
2538
2539 void LrtsPreCommonInit(int everReturn){
2540 #if USE_LRTS_MEMPOOL
2541     CpvInitialize(mempool_type*, mempool);
2542     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2543     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2544 #endif
2545 }
2546
2547
2548 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2549 {
2550     register int            i;
2551     int                     rc;
2552     int                     device_id = 0;
2553     unsigned int            remote_addr;
2554     gni_cdm_handle_t        cdm_hndl;
2555     gni_return_t            status = GNI_RC_SUCCESS;
2556     uint32_t                vmdh_index = -1;
2557     uint8_t                 ptag;
2558     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2559     int                     first_spawned;
2560     int                     physicalID;
2561     char                   *env;
2562
2563     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2564     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2565     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2566    
2567     status = PMI_Init(&first_spawned);
2568     GNI_RC_CHECK("PMI_Init", status);
2569
2570     status = PMI_Get_size(&mysize);
2571     GNI_RC_CHECK("PMI_Getsize", status);
2572
2573     status = PMI_Get_rank(&myrank);
2574     GNI_RC_CHECK("PMI_getrank", status);
2575
2576     //physicalID = CmiPhysicalNodeID(myrank);
2577     
2578     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2579
2580     *myNodeID = myrank;
2581     *numNodes = mysize;
2582   
2583 #if MULTI_THREAD_SEND
2584     /* Currently, we only consider the case that comm. thread will only recv msgs */
2585     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2586 #endif
2587
2588     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2589     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2590     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2591
2592     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2593     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2594     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2595
2596     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2597     if (env) useDynamicSMSG = 1;
2598     if (!useDynamicSMSG)
2599       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2600     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2601     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2602     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2603     
2604     if(myrank == 0)
2605     {
2606         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2607         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2608     }
2609 #ifdef USE_ONESIDED
2610     onesided_init(NULL, &onesided_hnd);
2611
2612     // this is a GNI test, so use the libonesided bypass functionality
2613     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2614     local_addr = gniGetNicAddress();
2615 #else
2616     ptag = get_ptag();
2617     cookie = get_cookie();
2618 #if 0
2619     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2620 #endif
2621     //Create and attach to the communication  domain */
2622     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2623     GNI_RC_CHECK("GNI_CdmCreate", status);
2624     //* device id The device id is the minor number for the device
2625     //that is assigned to the device by the system when the device is created.
2626     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2627     //where X is the device number 0 default 
2628     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2629     GNI_RC_CHECK("GNI_CdmAttach", status);
2630     local_addr = get_gni_nic_address(0);
2631 #endif
2632     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2633     _MEMCHECK(MPID_UGNI_AllAddr);
2634     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2635     /* create the local completion queue */
2636     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2637     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2638     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2639     
2640     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2641     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2642     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2643
2644     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2645     GNI_RC_CHECK("Create CQ (rx)", status);
2646     
2647     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2648     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2649     
2650     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2651     //GNI_RC_CHECK("Create BTE CQ", status);
2652
2653     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2654     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2655     _MEMCHECK(ep_hndl_array);
2656 #if CMK_SMP && !COMM_THREAD_SEND
2657     tx_cq_lock  = CmiCreateLock();
2658     rx_cq_lock  = CmiCreateLock();
2659 #endif
2660     for (i=0; i<mysize; i++) {
2661         if(i == myrank) continue;
2662         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2663         GNI_RC_CHECK("GNI_EpCreate ", status);   
2664         remote_addr = MPID_UGNI_AllAddr[i];
2665         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2666         GNI_RC_CHECK("GNI_EpBind ", status);   
2667     }
2668
2669     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2670     _init_smsg();
2671     PMI_Barrier();
2672
2673 #if     USE_LRTS_MEMPOOL
2674     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
2675     if (env) _mempool_size = CmiReadSize(env);
2676     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
2677         _mempool_size = CmiReadSize(env);
2678
2679
2680     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2681     if (env) MAX_REG_MEM = CmiReadSize(env);
2682     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) 
2683         MAX_REG_MEM = CmiReadSize(env);
2684
2685     env = getenv("CHARM_UGNI_SEND_MAX");
2686     if (env) MAX_BUFF_SEND = CmiReadSize(env);
2687     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) 
2688         MAX_BUFF_SEND = CmiReadSize(env);
2689
2690     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2691     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2692
2693     if (myrank==0) {
2694         printf("Charm++> memory pool init size: %1.fMB\n", _mempool_size/1024.0/1024);
2695         printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
2696         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
2697             /* memblock can expand to BIG_MSG * 2 size */
2698             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
2699             CmiAbort("mempool maximum size is too small. \n");
2700         }
2701 #if CMK_SMP && MULTI_THREAD_SEND
2702         printf("Charm++> worker thread sending messages\n");
2703 #elif CMK_SMP && COMM_THREAD_SEND
2704         printf("Charm++> only comm thread send/recv messages\n");
2705 #endif
2706     }
2707 #endif
2708
2709     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
2710     if (env) _checkProgress = 0;
2711     if (mysize == 1) _checkProgress = 0;
2712
2713     /* init DMA buffer for medium message */
2714
2715     //_init_DMA_buffer();
2716     
2717     free(MPID_UGNI_AllAddr);
2718 #if CMK_SMP
2719     sendRdmaBuf = PCQueueCreate();
2720 #else
2721     sendRdmaBuf = 0;
2722 #endif
2723 #if MACHINE_DEBUG_LOG
2724     char ln[200];
2725     sprintf(ln,"debugLog.%d",myrank);
2726     debugLog=fopen(ln,"w");
2727 #endif
2728
2729 }
2730
2731 void* LrtsAlloc(int n_bytes, int header)
2732 {
2733     void *ptr;
2734 #if 0
2735     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2736 #endif
2737     if(n_bytes <= SMSG_MAX_MSG)
2738     {
2739         int totalsize = n_bytes+header;
2740         ptr = malloc(totalsize);
2741     }
2742     else {
2743         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2744 #if     USE_LRTS_MEMPOOL
2745         n_bytes = ALIGN64(n_bytes);
2746         if(n_bytes < BIG_MSG)
2747         {
2748             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2749             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2750         }else 
2751         {
2752             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2753             ptr = res + ALIGNBUF - header;
2754
2755         }
2756 #else
2757         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2758         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2759         ptr = res + ALIGNBUF - header;
2760 #endif
2761     }
2762     return ptr;
2763 }
2764
2765 void  LrtsFree(void *msg)
2766 {
2767     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2768     if (size <= SMSG_MAX_MSG)
2769       free(msg);
2770     else if(size>=BIG_MSG)
2771     {
2772         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2773
2774     }else
2775     {
2776 #if     USE_LRTS_MEMPOOL
2777 #if CMK_SMP
2778         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2779 #else
2780         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2781 #endif
2782 #else
2783         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2784 #endif
2785     }
2786 }
2787
2788 void LrtsExit()
2789 {
2790     /* free memory ? */
2791 #if USE_LRTS_MEMPOOL
2792     mempool_destroy(CpvAccess(mempool));
2793 #endif
2794     PMI_Finalize();
2795     exit(0);
2796 }
2797
2798 void LrtsDrainResources()
2799 {
2800     if(mysize == 1) return;
2801     while (
2802 #if CMK_USE_OOB
2803            !SendBufferMsg(&smsg_oob_queue) ||
2804 #endif
2805            !SendBufferMsg(&smsg_queue))
2806     {
2807         if (useDynamicSMSG)
2808             PumpDatagramConnection();
2809         PumpNetworkSmsg();
2810         PumpLocalRdmaTransactions();
2811         SendRdmaMsg();
2812     }
2813     PMI_Barrier();
2814 }
2815
2816 void LrtsAbort(const char *message) {
2817     printf("CmiAbort is calling on PE:%d\n", myrank);
2818     CmiPrintStackTrace(0);
2819     PMI_Abort(-1, message);
2820 }
2821
2822 /**************************  TIMER FUNCTIONS **************************/
2823 #if CMK_TIMER_USE_SPECIAL
2824 /* MPI calls are not threadsafe, even the timer on some machines */
2825 static CmiNodeLock  timerLock = 0;
2826 static int _absoluteTime = 0;
2827 static int _is_global = 0;
2828 static struct timespec start_ts;
2829
2830 inline int CmiTimerIsSynchronized() {
2831     return 0;
2832 }
2833
2834 inline int CmiTimerAbsolute() {
2835     return _absoluteTime;
2836 }
2837
2838 double CmiStartTimer() {
2839     return 0.0;
2840 }
2841
2842 double CmiInitTime() {
2843     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2844 }
2845
2846 void CmiTimerInit(char **argv) {
2847     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2848     if (_absoluteTime && CmiMyPe() == 0)
2849         printf("Charm++> absolute  timer is used\n");
2850     
2851     _is_global = CmiTimerIsSynchronized();
2852
2853
2854     if (_is_global) {
2855         if (CmiMyRank() == 0) {
2856             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2857         }
2858     } else { /* we don't have a synchronous timer, set our own start time */
2859         CmiBarrier();
2860         CmiBarrier();
2861         CmiBarrier();
2862         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2863     }
2864     CmiNodeAllBarrier();          /* for smp */
2865 }
2866
2867 /**
2868  * Since the timerLock is never created, and is
2869  * always NULL, then all the if-condition inside
2870  * the timer functions could be disabled right
2871  * now in the case of SMP.
2872  */
2873 double CmiTimer(void) {
2874     struct timespec now_ts;
2875     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2876     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2877         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2878 }
2879
2880 double CmiWallTimer(void) {
2881     struct timespec now_ts;
2882     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2883     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2884         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2885 }
2886
2887 double CmiCpuTimer(void) {
2888     struct timespec now_ts;
2889     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2890     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2891         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2892 }
2893
2894 #endif
2895 /************Barrier Related Functions****************/
2896
2897 int CmiBarrier()
2898 {
2899     gni_return_t status;
2900
2901 #if CMK_SMP
2902     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2903     CmiNodeAllBarrier();
2904     if (CmiMyRank() == CmiMyNodeSize())
2905 #else
2906     if (CmiMyRank() == 0)
2907 #endif
2908     {
2909         /**
2910          *  The call of CmiBarrier is usually before the initialization
2911          *  of trace module of Charm++, therefore, the START_EVENT
2912          *  and END_EVENT are disabled here. -Chao Mei
2913          */
2914         /*START_EVENT();*/
2915         status = PMI_Barrier();
2916         GNI_RC_CHECK("PMI_Barrier", status);
2917         /*END_EVENT(10);*/
2918     }
2919     CmiNodeAllBarrier();
2920     return status;
2921
2922 }
2923 #if CMK_DIRECT
2924 #include "machine-cmidirect.c"
2925 #endif
2926 #if CMK_PERSISTENT_COMM
2927 #include "machine-persistent.c"
2928 #endif
2929
2930