Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17  */
18 /*@{*/
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <stdint.h>
23 #include <errno.h>
24 #include <malloc.h>
25 #include <unistd.h>
26
27 #include <gni_pub.h>
28 #include <pmi.h>
29 #include <time.h>
30
31 #include "converse.h"
32
33 #define USE_OOB                     0
34
35 #define PRINT_SYH  0
36
37 // Trace communication thread
38 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
39 #define TRACE_THRESHOLD     0.00005
40 #define CMI_MPI_TRACE_MOREDETAILED 0
41 #undef CMI_MPI_TRACE_USEREVENTS
42 #define CMI_MPI_TRACE_USEREVENTS 1
43 #else
44 #undef CMK_SMP_TRACE_COMMTHREAD
45 #define CMK_SMP_TRACE_COMMTHREAD 0
46 #endif
47
48 #define CMK_TRACE_COMMOVERHEAD 0
49 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
50 #undef CMI_MPI_TRACE_USEREVENTS
51 #define CMI_MPI_TRACE_USEREVENTS 1
52 #else
53 #undef CMK_TRACE_COMMOVERHEAD
54 #define CMK_TRACE_COMMOVERHEAD 0
55 #endif
56
57 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
58 CpvStaticDeclare(double, projTraceStart);
59 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
60 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
61 #else
62 #define  START_EVENT()
63 #define  END_EVENT(x)
64 #endif
65
66 #define CMI_EXERT_SEND_CAP      0
67 #define CMI_EXERT_RECV_CAP      0
68
69 #if CMI_EXERT_SEND_CAP
70 #define SEND_CAP 16
71 #endif
72
73 #if CMI_EXERT_RECV_CAP
74 #define RECV_CAP 2
75 #endif
76
77
78
79 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
80
81 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
82
83 static int avg_smsg_connection = 32;
84 static int                 *smsg_connected_flag= 0;
85 static gni_smsg_attr_t     **smsg_attr_vector_local;
86 static gni_smsg_attr_t     **smsg_attr_vector_remote;
87 static gni_ep_handle_t     ep_hndl_unbound;
88 static gni_smsg_attr_t     send_smsg_attr;
89 static gni_smsg_attr_t     recv_smsg_attr;
90
91 typedef struct _dynamic_smsg_mailbox{
92    void     *mailbox_base;
93    int      size;
94    int      offset;
95    gni_mem_handle_t  mem_hndl;
96    struct      _dynamic_smsg_mailbox  *next;
97 }dynamic_smsg_mailbox_t;
98
99 static dynamic_smsg_mailbox_t  *mailbox_list;
100
101 #define REMOTE_EVENT                      0
102 #define USE_LRTS_MEMPOOL                  1
103
104 #if USE_LRTS_MEMPOOL
105 #if CMK_SMP
106 #define STEAL_MEMPOOL                     0
107 #endif
108
109 #define oneMB (1024ll*1024)
110 static CmiInt8 _mempool_size = 8*oneMB;
111 static CmiInt8 _expand_mem =  4*oneMB;
112 #endif
113
114 #if CMK_SMP
115 //Dynamic flow control about memory registration
116 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
117 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
118 #else
119 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
120 static CmiInt8  MAX_REG_MEM    =  20*oneMB;
121 #endif
122
123 static CmiInt8 buffered_send_msg = 0;
124
125 #define BIG_MSG                  16*oneMB
126 #define ONE_SEG                  4*oneMB
127 #define BIG_MSG_PIPELINE         4
128
129 #if CMK_SMP
130 #define COMM_THREAD_SEND 1
131 #endif
132
133 int         rdma_id = 0;
134
135 #if PRINT_SYH
136 int         lrts_smsg_success = 0;
137 int         lrts_send_msg_id = 0;
138 int         lrts_send_rdma_success = 0;
139 int         lrts_local_done_msg = 0;
140 #endif
141
142 #include "machine.h"
143
144 #include "pcqueue.h"
145
146 #include "mempool.h"
147
148 #if CMK_PERSISTENT_COMM
149 #include "machine-persistent.h"
150 #endif
151
152 //#define  USE_ONESIDED 1
153 #ifdef USE_ONESIDED
154 //onesided implementation is wrong, since no place to restore omdh
155 #include "onesided.h"
156 onesided_hnd_t   onesided_hnd;
157 onesided_md_t    omdh;
158 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
159
160 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
161
162 #else
163 uint8_t   onesided_hnd, omdh;
164 #if REMOTE_EVENT
165 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
166 #else
167 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
168 #endif
169 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
170 #endif
171
172 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
173 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
174 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
175                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
176 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
177                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
178 #define   GetMempoolBlockPtr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
179 #define   GetMempoolPtr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
180 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
181 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
182 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
183 #define   GetSizeFromHeader(x) ((block_header*)x)->size
184 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
185 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
186 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
187 #define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
188 #define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
189 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
190
191 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
192 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
193 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
194 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
195
196 #define ALIGNBUF                64
197
198 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
199 /* If SMSG is not used */
200
201 #define FMA_PER_CORE  1024
202 #define FMA_BUFFER_SIZE 1024
203
204 /* If SMSG is used */
205 static int  SMSG_MAX_MSG = 1024;
206 #define SMSG_MAX_CREDIT 72 
207
208 #define MSGQ_MAXSIZE       2048
209 /* large message transfer with FMA or BTE */
210 #define LRTS_GNI_RDMA_THRESHOLD  1024 
211
212 #if CMK_SMP
213 static int  REMOTE_QUEUE_ENTRIES=163840; 
214 static int LOCAL_QUEUE_ENTRIES=163840; 
215 #else
216 static int  REMOTE_QUEUE_ENTRIES=20480;
217 static int LOCAL_QUEUE_ENTRIES=20480; 
218 #endif
219
220 #define BIG_MSG_TAG  0x26
221 #define PUT_DONE_TAG      0x28
222 #define DIRECT_PUT_DONE_TAG      0x29
223 #define ACK_TAG           0x30
224 /* SMSG is data message */
225 #define SMALL_DATA_TAG          0x31
226 /* SMSG is a control message to initialize a BTE */
227 #define MEDIUM_HEAD_TAG         0x32
228 #define MEDIUM_DATA_TAG         0x33
229 #define LMSG_INIT_TAG           0x39 
230 #define VERY_LMSG_INIT_TAG      0x40 
231 #define VERY_LMSG_TAG           0x41 
232
233 #define DEBUG
234 #ifdef GNI_RC_CHECK
235 #undef GNI_RC_CHECK
236 #endif
237 #ifdef DEBUG
238 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
239 #else
240 #define GNI_RC_CHECK(msg,rc)
241 #endif
242
243 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
244 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
245
246 static int useStaticMSGQ = 0;
247 static int useStaticFMA = 0;
248 static int mysize, myrank;
249 gni_nic_handle_t      nic_hndl;
250
251 typedef struct {
252     gni_mem_handle_t mdh;
253     uint64_t addr;
254 } mdh_addr_t ;
255 // this is related to dynamic SMSG
256
257 typedef struct mdh_addr_list{
258     gni_mem_handle_t mdh;
259    void *addr;
260     struct mdh_addr_list *next;
261 }mdh_addr_list_t;
262
263 static unsigned int         smsg_memlen;
264 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
265 mdh_addr_t          setup_mem;
266 mdh_addr_t          *smsg_connection_vec = 0;
267 gni_mem_handle_t    smsg_connection_memhndl;
268 static int          smsg_expand_slots = 10;
269 static int          smsg_available_slot = 0;
270 static void         *smsg_mailbox_mempool = 0;
271 mdh_addr_list_t     *smsg_dynamic_list = 0;
272
273 static void             *smsg_mailbox_base;
274 gni_msgq_attr_t         msgq_attrs;
275 gni_msgq_handle_t       msgq_handle;
276 gni_msgq_ep_attr_t      msgq_ep_attrs;
277 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
278
279 /* =====Beginning of Declarations of Machine Specific Variables===== */
280 static int cookie;
281 static int modes = 0;
282 static gni_cq_handle_t       smsg_rx_cqh = NULL;
283 static gni_cq_handle_t       smsg_tx_cqh = NULL;
284 static gni_cq_handle_t       post_rx_cqh = NULL;
285 static gni_cq_handle_t       post_tx_cqh = NULL;
286 static gni_ep_handle_t       *ep_hndl_array;
287 #if CMK_SMP && !COMM_THREAD_SEND
288 static CmiNodeLock           *ep_lock_array;
289 static CmiNodeLock           tx_cq_lock; 
290 static CmiNodeLock           rx_cq_lock; 
291 #endif
292 typedef struct msg_list
293 {
294     uint32_t destNode;
295     uint32_t size;
296     void *msg;
297     uint8_t tag;
298 #if !CMK_SMP
299     struct msg_list *next;
300 #endif
301 }MSG_LIST;
302
303
304 typedef struct control_msg
305 {
306     uint64_t            source_addr;    /* address from the start of buffer  */
307     uint64_t            dest_addr;      /* address from the start of buffer */
308     //int                 source;         /* source rank */
309     int                 total_length;   /* total length */
310     int                 length;         /* length of this packet */
311     uint8_t             seq_id;         //big message   0 meaning single message
312     gni_mem_handle_t    source_mem_hndl;
313     struct control_msg *next;
314 }CONTROL_MSG;
315
316 #ifdef CMK_DIRECT
317 typedef struct{
318     void *recverBuf;
319     void (*callbackFnPtr)(void *);
320     void *callbackData;
321 }CMK_DIRECT_HEADER;
322 #endif
323 typedef struct  rmda_msg
324 {
325     int                   destNode;
326     gni_post_descriptor_t *pd;
327 #if !CMK_SMP
328     struct  rmda_msg      *next;
329 #endif
330 }RDMA_REQUEST;
331
332 #if CMK_SMP
333 PCQueue sendRdmaBuf;
334 typedef struct  msg_list_index
335 {
336     int         next;
337     PCQueue     sendSmsgBuf;
338 } MSG_LIST_INDEX;
339 #else
340 static RDMA_REQUEST        *sendRdmaBuf = 0;
341 static RDMA_REQUEST        *sendRdmaTail = 0;
342 typedef struct  msg_list_index
343 {
344     int         next;
345     MSG_LIST    *sendSmsgBuf;
346     MSG_LIST    *tail;
347 } MSG_LIST_INDEX;
348 #endif
349
350 /* reuse PendingMsg memory */
351 static CONTROL_MSG          *control_freelist=0;
352 static MSG_LIST             *msglist_freelist=0;
353
354 typedef struct smsg_queue
355 {
356     MSG_LIST_INDEX   *smsg_msglist_index;
357     int               smsg_head_index;
358 } SMSG_QUEUE;
359
360 SMSG_QUEUE                  smsg_queue;
361 SMSG_QUEUE                  smsg_oob_queue;
362
363 #if CMK_SMP
364
365 #define FreeMsgList(d)   free(d);
366 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
367
368 #else
369
370 #define FreeMsgList(d)  \
371   (d)->next = msglist_freelist;\
372   msglist_freelist = d;
373
374 #define MallocMsgList(d) \
375   d = msglist_freelist;\
376   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
377              _MEMCHECK(d);\
378   } else msglist_freelist = d->next; \
379   d->next =0;
380 #endif
381
382 #if CMK_SMP
383
384 #define FreeControlMsg(d)      free(d);
385 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
386
387 #else
388
389 #define FreeControlMsg(d)       \
390   (d)->next = control_freelist;\
391   control_freelist = d;
392
393 #define MallocControlMsg(d) \
394   d = control_freelist;\
395   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
396              _MEMCHECK(d);\
397   } else control_freelist = d->next;
398
399 #endif
400
401 static RDMA_REQUEST         *rdma_freelist = NULL;
402
403 #define FreeMediumControlMsg(d)       \
404   (d)->next = medium_control_freelist;\
405   medium_control_freelist = d;
406
407
408 #define MallocMediumControlMsg(d) \
409     d = medium_control_freelist;\
410     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
411     _MEMCHECK(d);\
412 } else mediumcontrol_freelist = d->next;
413
414 # if CMK_SMP
415 #define FreeRdmaRequest(d)       free(d);
416 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
417 #else
418
419 #define FreeRdmaRequest(d)       \
420   (d)->next = rdma_freelist;\
421   rdma_freelist = d;
422
423 #define MallocRdmaRequest(d) \
424   d = rdma_freelist;\
425   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
426              _MEMCHECK(d);\
427   } else rdma_freelist = d->next; \
428     d->next =0;
429 #endif
430
431 /* reuse gni_post_descriptor_t */
432 static gni_post_descriptor_t *post_freelist=0;
433
434 #if  !CMK_SMP
435 #define FreePostDesc(d)       \
436     (d)->next_descr = post_freelist;\
437     post_freelist = d;
438
439 #define MallocPostDesc(d) \
440   d = post_freelist;\
441   if (d==0) { \
442      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
443      _MEMCHECK(d);\
444   } else post_freelist = d->next_descr;
445 #else
446
447 #define FreePostDesc(d)     free(d);
448 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
449
450 #endif
451
452
453 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
454 static int      buffered_smsg_counter = 0;
455
456 /* SmsgSend return success but message sent is not confirmed by remote side */
457 static MSG_LIST *buffered_fma_head = 0;
458 static MSG_LIST *buffered_fma_tail = 0;
459
460 /* functions  */
461 #define IsFree(a,ind)  !( a& (1<<(ind) ))
462 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
463 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
464
465 CpvDeclare(mempool_type*, mempool);
466
467 /* get the upper bound of log 2 */
468 int mylog2(int size)
469 {
470     int op = size;
471     unsigned int ret=0;
472     unsigned int mask = 0;
473     int i;
474     while(op>0)
475     {
476         op = op >> 1;
477         ret++;
478
479     }
480     for(i=1; i<ret; i++)
481     {
482         mask = mask << 1;
483         mask +=1;
484     }
485
486     ret -= ((size &mask) ? 0:1);
487     return ret;
488 }
489
490 static void
491 allgather(void *in,void *out, int len)
492 {
493     static int *ivec_ptr=NULL,already_called=0,job_size=0;
494     int i,rc;
495     int my_rank;
496     char *tmp_buf,*out_ptr;
497
498     if(!already_called) {
499
500         rc = PMI_Get_size(&job_size);
501         CmiAssert(rc == PMI_SUCCESS);
502         rc = PMI_Get_rank(&my_rank);
503         CmiAssert(rc == PMI_SUCCESS);
504
505         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
506         CmiAssert(ivec_ptr != NULL);
507
508         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
509         CmiAssert(rc == PMI_SUCCESS);
510
511         already_called = 1;
512
513     }
514
515     tmp_buf = (char *)malloc(job_size * len);
516     CmiAssert(tmp_buf);
517
518     rc = PMI_Allgather(in,tmp_buf,len);
519     CmiAssert(rc == PMI_SUCCESS);
520
521     out_ptr = out;
522
523     for(i=0;i<job_size;i++) {
524
525         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
526
527     }
528
529     free(tmp_buf);
530 }
531
532 static void
533 allgather_2(void *in,void *out, int len)
534 {
535     //PMI_Allgather is out of order
536     int i,rc, extend_len;
537     int  rank_index;
538     char *out_ptr, *out_ref;
539     char *in2;
540
541     extend_len = sizeof(int) + len;
542     in2 = (char*)malloc(extend_len);
543
544     memcpy(in2, &myrank, sizeof(int));
545     memcpy(in2+sizeof(int), in, len);
546
547     out_ptr = (char*)malloc(mysize*extend_len);
548
549     rc = PMI_Allgather(in2, out_ptr, extend_len);
550     GNI_RC_CHECK("allgather", rc);
551
552     out_ref = out;
553
554     for(i=0;i<mysize;i++) {
555         //rank index 
556         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
557         //copy to the rank index slot
558         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
559     }
560
561     free(out_ptr);
562     free(in2);
563
564 }
565
566 static unsigned int get_gni_nic_address(int device_id)
567 {
568     unsigned int address, cpu_id;
569     gni_return_t status;
570     int i, alps_dev_id=-1,alps_address=-1;
571     char *token, *p_ptr;
572
573     p_ptr = getenv("PMI_GNI_DEV_ID");
574     if (!p_ptr) {
575         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
576        
577         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
578     } else {
579         while ((token = strtok(p_ptr,":")) != NULL) {
580             alps_dev_id = atoi(token);
581             if (alps_dev_id == device_id) {
582                 break;
583             }
584             p_ptr = NULL;
585         }
586         CmiAssert(alps_dev_id != -1);
587         p_ptr = getenv("PMI_GNI_LOC_ADDR");
588         CmiAssert(p_ptr != NULL);
589         i = 0;
590         while ((token = strtok(p_ptr,":")) != NULL) {
591             if (i == alps_dev_id) {
592                 alps_address = atoi(token);
593                 break;
594             }
595             p_ptr = NULL;
596             ++i;
597         }
598         CmiAssert(alps_address != -1);
599         address = alps_address;
600     }
601     return address;
602 }
603
604 static uint8_t get_ptag(void)
605 {
606     char *p_ptr, *token;
607     uint8_t ptag;
608
609     p_ptr = getenv("PMI_GNI_PTAG");
610     CmiAssert(p_ptr != NULL);
611     token = strtok(p_ptr, ":");
612     ptag = (uint8_t)atoi(token);
613     return ptag;
614         
615 }
616
617 static uint32_t get_cookie(void)
618 {
619     uint32_t cookie;
620     char *p_ptr, *token;
621
622     p_ptr = getenv("PMI_GNI_COOKIE");
623     CmiAssert(p_ptr != NULL);
624     token = strtok(p_ptr, ":");
625     cookie = (uint32_t)atoi(token);
626
627     return cookie;
628 }
629
630 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
631 /* TODO: add any that are related */
632 /* =====End of Definitions of Message-Corruption Related Macros=====*/
633
634
635 #include "machine-lrts.h"
636 #include "machine-common-core.c"
637
638 /* Network progress function is used to poll the network when for
639    messages. This flushes receive buffers on some  implementations*/
640 #if CMK_MACHINE_PROGRESS_DEFINED
641 void CmiMachineProgressImpl() {
642 }
643 #endif
644
645 static void SendRdmaMsg();
646 static void PumpNetworkSmsg();
647 static void PumpLocalRdmaTransactions();
648 static int SendBufferMsg();
649 static     int register_mempool = 0;
650
651 #if MACHINE_DEBUG_LOG
652 FILE *debugLog = NULL;
653 static CmiInt8 buffered_recv_msg = 0;
654 int         lrts_smsg_success = 0;
655 int         lrts_received_msg = 0;
656 #endif
657 static void sweep_mempool(mempool_type *mptr)
658 {
659     block_header *current = &(mptr->block_head);
660
661 printf("[n %d] sweep_mempool slot START .\n", myrank);
662     while( current!= NULL) {
663 printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
664         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
665     }
666 printf("[n %d] sweep_mempool slot END .\n", myrank);
667 }
668
669 inline
670 static  gni_return_t deregisterMempool(mempool_type *mptr, block_header **from)
671 {
672     gni_return_t status;
673     block_header *current = *from;
674
675     while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
676        current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
677
678     *from = current;
679     if(current == NULL) return GNI_RC_ERROR_RESOURCE;
680     status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh);
681     GNI_RC_CHECK("registerMemorypool de-register", status);
682     register_mempool -= GetSizeFromHeader(current);
683     SetMemHndlZero(GetMemHndlFromHeader(current));
684     return GNI_RC_SUCCESS;
685 }
686
687 inline 
688 static gni_return_t registerFromMempool(mempool_type *mptr, void *msg)
689 {
690     gni_return_t status = GNI_RC_SUCCESS;
691     int size = GetMempoolsize(msg);
692     void *blockaddr = GetMempoolBlockPtr(msg);
693     gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
694    
695     block_header *current = &(mptr->block_head);
696
697     while(register_mempool > MAX_REG_MEM)
698     {
699         status = deregisterMempool(mptr, &current);
700         if (status != GNI_RC_SUCCESS) break;
701     };
702     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_mempool); 
703     while(1)
704     {
705         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh);
706         if(status == GNI_RC_SUCCESS)
707         {
708             register_mempool += size;
709             break;
710         }
711         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
712         {
713             CmiAbort("Memory registor for mempool fails\n");
714         }
715         else
716         {
717             status = deregisterMempool(mptr, &current);
718             if (status != GNI_RC_SUCCESS) break;
719         }
720     }; 
721     return status;
722 }
723
724 inline 
725 static gni_return_t registerMempool(void *msg)
726 {
727     static int rank = -1;
728     int i;
729     gni_return_t status;
730     mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
731     mempool_type *mptr;
732
733     status = registerFromMempool(mptr1, msg);
734     if (status == GNI_RC_SUCCESS) return status;
735 #if CMK_SMP
736     for (i=0; i<CmiMyNodeSize()+1; i++) {
737       rank = (rank+1)%(CmiMyNodeSize()+1);
738       mptr = CpvAccessOther(mempool, rank);
739       if (mptr == mptr1) continue;
740       status = registerFromMempool(mptr, msg);
741       if (status == GNI_RC_SUCCESS) return status;
742     }
743 #endif
744     return  GNI_RC_ERROR_RESOURCE;
745 }
746
747 inline
748 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
749 {
750     MSG_LIST        *msg_tmp;
751     MallocMsgList(msg_tmp);
752     msg_tmp->destNode = destNode;
753     msg_tmp->size   = size;
754     msg_tmp->msg    = msg;
755     msg_tmp->tag    = tag;
756
757 #if !CMK_SMP
758     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
759         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
760         queue->smsg_head_index = destNode;
761         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
762     }else
763     {
764         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
765         queue->smsg_msglist_index[destNode].tail = msg_tmp;
766     }
767 #else
768     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
769 #endif
770 #if PRINT_SYH
771     buffered_smsg_counter++;
772 #endif
773 }
774
775 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
776 {
777     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
778 }
779
780 inline
781 static void setup_smsg_connection(int destNode)
782 {
783     mdh_addr_list_t  *new_entry = 0;
784     gni_post_descriptor_t *pd;
785     gni_smsg_attr_t      *smsg_attr;
786     gni_return_t status = GNI_RC_NOT_DONE;
787     RDMA_REQUEST        *rdma_request_msg;
788     
789     if(smsg_available_slot == smsg_expand_slots)
790     {
791         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
792         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
793         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
794
795         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
796             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
797             GNI_MEM_READWRITE,   
798             -1,
799             &(new_entry->mdh));
800         smsg_available_slot = 0; 
801         new_entry->next = smsg_dynamic_list;
802         smsg_dynamic_list = new_entry;
803     }
804     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
805     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
806     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
807     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
808     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
809     smsg_attr->buff_size = smsg_memlen;
810     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
811     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
812     smsg_local_attr_vec[destNode] = smsg_attr;
813     smsg_available_slot++;
814     MallocPostDesc(pd);
815     pd->type            = GNI_POST_FMA_PUT;
816     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
817     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
818     pd->length          = sizeof(gni_smsg_attr_t);
819     pd->local_addr      = (uint64_t) smsg_attr;
820     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
821     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
822     pd->src_cq_hndl     = 0;
823     pd->rdma_mode       = 0;
824     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
825     print_smsg_attr(smsg_attr);
826     if(status == GNI_RC_ERROR_RESOURCE )
827     {
828         MallocRdmaRequest(rdma_request_msg);
829         rdma_request_msg->destNode = destNode;
830         rdma_request_msg->pd = pd;
831         /* buffer this request */
832     }
833 #if PRINT_SYH
834     if(status != GNI_RC_SUCCESS)
835        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
836     else
837         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
838 #endif
839 }
840
841 /* useDynamicSMSG */
842 inline 
843 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
844 {
845     gni_return_t status = GNI_RC_NOT_DONE;
846
847     if(mailbox_list->offset == mailbox_list->size)
848     {
849         dynamic_smsg_mailbox_t *new_mailbox_entry;
850         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
851         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
852         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
853         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
854         new_mailbox_entry->offset = 0;
855         
856         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
857             new_mailbox_entry->size, smsg_rx_cqh,
858             GNI_MEM_READWRITE,   
859             -1,
860             &(new_mailbox_entry->mem_hndl));
861
862         GNI_RC_CHECK("register", status);
863         new_mailbox_entry->next = mailbox_list;
864         mailbox_list = new_mailbox_entry;
865     }
866     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
867     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
868     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
869     local_smsg_attr->mbox_offset = mailbox_list->offset;
870     mailbox_list->offset += smsg_memlen;
871     local_smsg_attr->buff_size = smsg_memlen;
872     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
873     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
874 }
875
876 /* useDynamicSMSG */
877 inline 
878 static int connect_to(int destNode)
879 {
880     gni_return_t status = GNI_RC_NOT_DONE;
881     CmiAssert(smsg_connected_flag[destNode] == 0);
882     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
883     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
884     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
885     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
886             
887 #if CMK_SMP && !COMM_THREAD_SEND
888     //CmiLock(ep_lock_array[destNode]);
889     CmiLock(tx_cq_lock);
890 #endif
891     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
892 #if CMK_SMP && !COMM_THREAD_SEND
893     //CmiUnlock(ep_lock_array[destNode]);
894     CmiUnlock(tx_cq_lock);
895 #endif
896     if (status == GNI_RC_ERROR_RESOURCE) {
897       /* possibly destNode is making connection at the same time */
898       free(smsg_attr_vector_local[destNode]);
899       smsg_attr_vector_local[destNode] = NULL;
900       free(smsg_attr_vector_remote[destNode]);
901       smsg_attr_vector_remote[destNode] = NULL;
902       mailbox_list->offset -= smsg_memlen;
903       return 0;
904     }
905     GNI_RC_CHECK("GNI_Post", status);
906     smsg_connected_flag[destNode] = 1;
907     return 1;
908 }
909
910 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
911 inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
912 {
913     unsigned int          remote_address;
914     uint32_t              remote_id;
915     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
916     gni_smsg_attr_t       *smsg_attr;
917     gni_post_descriptor_t *pd;
918     gni_post_state_t      post_state;
919     char                  *real_data; 
920
921     if (useDynamicSMSG) {
922         switch (smsg_connected_flag[destNode]) {
923         case 0: 
924             connect_to(destNode);         /* continue to case 1 */
925         case 1:                           /* pending connection, do nothing */
926             status = GNI_RC_NOT_DONE;
927             if(inbuff ==0)
928                 buffer_small_msgs(queue, msg, size, destNode, tag);
929             return status;
930         }
931     }
932 #if CMK_SMP
933     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
934     {
935 #else
936     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
937     {
938 #endif
939 #if CMK_SMP && !COMM_THREAD_SEND
940         CmiLock(tx_cq_lock);
941 #endif
942         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
943 #if CMK_SMP && !COMM_THREAD_SEND
944         CmiUnlock(tx_cq_lock);
945 #endif
946         if(status == GNI_RC_SUCCESS)
947         {
948 #if CMK_SMP_TRACE_COMMTHREAD
949             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
950             { 
951                 START_EVENT();
952                 if ( tag == SMALL_DATA_TAG)
953                     real_data = (char*)msg; 
954                 else 
955                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
956                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
957             }
958 #endif
959             return status;
960         }else
961             status = GNI_RC_ERROR_RESOURCE;
962     }
963     if(inbuff ==0)
964         buffer_small_msgs(queue, msg, size, destNode, tag);
965     return status;
966 }
967
968
969 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
970 {
971     /* construct a control message and send */
972     CONTROL_MSG         *control_msg_tmp;
973     MallocControlMsg(control_msg_tmp);
974     control_msg_tmp->source_addr = (uint64_t)msg;
975     control_msg_tmp->seq_id    = seqno;
976     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
977 #if     USE_LRTS_MEMPOOL
978     if(size < BIG_MSG)
979     {
980         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
981     }
982     else
983     {
984         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
985         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
986         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
987     }
988 #else
989     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
990 #endif
991     return control_msg_tmp;
992 }
993
994 // Large message, send control to receiver, receiver register memory and do a GET, 
995 // return 1 - send no success
996 inline
997 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
998 {
999     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1000     uint32_t            vmdh_index  = -1;
1001     int                 size;
1002     int                 offset = 0;
1003     uint64_t            source_addr;
1004     int                 register_size; 
1005
1006     size    =   control_msg_tmp->total_length;
1007     source_addr = control_msg_tmp->source_addr;
1008     register_size = control_msg_tmp->length;
1009
1010 #if     USE_LRTS_MEMPOOL
1011     if( control_msg_tmp->seq_id == 0 ){
1012         if(buffered_send_msg >= MAX_BUFF_SEND)
1013         {
1014             if(!inbuff)
1015                 buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1016             return GNI_RC_ERROR_NOMEM;
1017         }
1018         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1019         {
1020             //register the corresponding mempool
1021             status = registerMempool((void*)source_addr);
1022             if(status == GNI_RC_SUCCESS)
1023             {
1024                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1025             }
1026         }else
1027         {
1028             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1029             status = GNI_RC_SUCCESS;
1030         }
1031         if(NoMsgInSend( control_msg_tmp->source_addr))
1032             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1033         else
1034             register_size = 0;
1035     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1036     {
1037         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1038         source_addr += offset;
1039         size = control_msg_tmp->length;
1040         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1041         register_size = 0;  
1042     }
1043
1044     if(status == GNI_RC_SUCCESS)
1045     {
1046         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1047         if(status == GNI_RC_SUCCESS)
1048         {
1049             buffered_send_msg += register_size;
1050             if(control_msg_tmp->seq_id == 0)
1051             {
1052                 IncreaseMsgInSend(source_addr);
1053             }
1054             FreeControlMsg(control_msg_tmp);
1055             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_mempool, LMSG_INIT_TAG); 
1056         }else
1057             status = GNI_RC_ERROR_RESOURCE;
1058
1059     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1060     {
1061         CmiAbort("Memory registor for large msg\n");
1062     }else 
1063     {
1064         status = GNI_RC_ERROR_NOMEM; 
1065         if(!inbuff)
1066             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1067     }
1068     return status;
1069 #else
1070     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1071     if(status == GNI_RC_SUCCESS)
1072     {
1073         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1074         if(status == GNI_RC_SUCCESS)
1075         {
1076             FreeControlMsg(control_msg_tmp);
1077         }
1078     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1079     {
1080         CmiAbort("Memory registor for large msg\n");
1081     }else 
1082     {
1083         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1084     }
1085     return status;
1086 #endif
1087 }
1088
1089 inline void LrtsPrepareEnvelope(char *msg, int size)
1090 {
1091     CmiSetMsgSize(msg, size);
1092 }
1093
1094 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1095 {
1096     gni_return_t        status  =   GNI_RC_SUCCESS;
1097     uint8_t tag;
1098     CONTROL_MSG         *control_msg_tmp;
1099     int                 oob = ( mode & OUT_OF_BAND);
1100     SMSG_QUEUE          *queue;
1101
1102 #if USE_OOB
1103     queue = oob? &smsg_oob_queue : &smsg_queue;
1104 #else
1105     queue = &smsg_queue;
1106 #endif
1107
1108     LrtsPrepareEnvelope(msg, size);
1109
1110 #if PRINT_SYH
1111     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1112 #endif 
1113 #if CMK_SMP && COMM_THREAD_SEND
1114     if(size <= SMSG_MAX_MSG)
1115         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1116     else if (size < BIG_MSG) {
1117         control_msg_tmp =  construct_control_msg(size, msg, 0);
1118         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1119     }
1120     else {
1121           CmiSetMsgSeq(msg, 0);
1122           control_msg_tmp =  construct_control_msg(size, msg, 1);
1123           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1124     }
1125 #else //non-smp, smp(worker sending)
1126     if(size <= SMSG_MAX_MSG)
1127     {
1128         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0))
1129             CmiFree(msg);
1130     }
1131     else if (size < BIG_MSG) {
1132         control_msg_tmp =  construct_control_msg(size, msg, 0);
1133         send_large_messages(queue, destNode, control_msg_tmp, 0);
1134     }
1135     else {
1136 #if     USE_LRTS_MEMPOOL
1137         CmiSetMsgSeq(msg, 0);
1138         control_msg_tmp =  construct_control_msg(size, msg, 1);
1139         send_large_messages(queue, destNode, control_msg_tmp, 0);
1140 #else
1141         control_msg_tmp =  construct_control_msg(size, msg, 0);
1142         send_large_messages(queue, destNode, control_msg_tmp, 0);
1143 #endif
1144     }
1145 #endif
1146     return 0;
1147 }
1148
1149 static void    PumpDatagramConnection();
1150 static void registerUserTraceEvents() {
1151 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1152     traceRegisterUserEvent("setting up connections", 10);
1153     traceRegisterUserEvent("Receiving small msgs", 20);
1154     traceRegisterUserEvent("Release local transaction", 30);
1155     traceRegisterUserEvent("Sending buffered small msgs", 40);
1156     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1157 #endif
1158 }
1159
1160 void LrtsPostCommonInit(int everReturn)
1161 {
1162 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1163     CpvInitialize(double, projTraceStart);
1164     /* only PE 0 needs to care about registration (to generate sts file). */
1165     if (CmiMyPe() == 0) {
1166         registerMachineUserEventsFunction(&registerUserTraceEvents);
1167     }
1168 #endif
1169
1170 #if CMK_SMP
1171     CmiIdleState *s=CmiNotifyGetState();
1172     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1173     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1174 #else
1175     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1176     if (useDynamicSMSG)
1177     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1178 #endif
1179 }
1180
1181 /* this is called by worker thread */
1182 void LrtsPostNonLocal(){
1183 #if CMK_SMP
1184 #if !COMM_THREAD_SEND
1185     if(mysize == 1) return;
1186     PumpLocalRdmaTransactions();
1187     SendBufferMsg();
1188     SendRdmaMsg();
1189 #endif
1190 #endif
1191 }
1192
1193 /* useDynamicSMSG */
1194 static void    PumpDatagramConnection()
1195 {
1196     uint32_t          remote_address;
1197     uint32_t          remote_id;
1198     gni_return_t status;
1199     gni_post_state_t  post_state;
1200     uint64_t          datagram_id;
1201     int i;
1202
1203    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1204    {
1205        if (datagram_id >= mysize) {           /* bound endpoint */
1206            int pe = datagram_id - mysize;
1207 #if CMK_SMP && !COMM_THREAD_SEND
1208            CmiLock(tx_cq_lock);
1209 #endif
1210            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1211 #if CMK_SMP && !COMM_THREAD_SEND
1212            CmiUnlock(tx_cq_lock);
1213 #endif
1214            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1215            {
1216                CmiAssert(remote_id == pe);
1217                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1218                GNI_RC_CHECK("Dynamic SMSG Init", status);
1219 #if PRINT_SYH
1220                printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1221 #endif
1222                CmiAssert(smsg_connected_flag[pe] == 1);
1223                smsg_connected_flag[pe] = 2;
1224            }
1225        }
1226        else {         /* unbound ep */
1227            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1228            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1229            {
1230                CmiAssert(remote_id<mysize);
1231                CmiAssert(smsg_connected_flag[remote_id] <= 0);
1232                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1233                GNI_RC_CHECK("Dynamic SMSG Init", status);
1234 #if PRINT_SYH
1235                printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1236 #endif
1237                smsg_connected_flag[remote_id] = 2;
1238
1239                alloc_smsg_attr(&send_smsg_attr);
1240                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1241                GNI_RC_CHECK("post unbound datagram", status);
1242            }
1243        }
1244    }
1245 }
1246
1247 /* pooling CQ to receive network message */
1248 static void PumpNetworkRdmaMsgs()
1249 {
1250     gni_cq_entry_t      event_data;
1251     gni_return_t        status;
1252
1253 }
1254
1255 inline 
1256 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1257 {
1258     RDMA_REQUEST        *rdma_request_msg;
1259     MallocRdmaRequest(rdma_request_msg);
1260     rdma_request_msg->destNode = inst_id;
1261     rdma_request_msg->pd = pd;
1262 #if CMK_SMP
1263     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1264 #else
1265     if(sendRdmaBuf == 0)
1266     {
1267         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1268     }else{
1269         sendRdmaTail->next = rdma_request_msg;
1270         sendRdmaTail =  rdma_request_msg;
1271     }
1272 #endif
1273
1274 }
1275 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1276 static void PumpNetworkSmsg()
1277 {
1278     uint64_t            inst_id;
1279     int                 ret;
1280     gni_cq_entry_t      event_data;
1281     gni_return_t        status;
1282     void                *header;
1283     uint8_t             msg_tag;
1284     int                 msg_nbytes;
1285     void                *msg_data;
1286     gni_mem_handle_t    msg_mem_hndl;
1287     gni_smsg_attr_t     *smsg_attr;
1288     gni_smsg_attr_t     *remote_smsg_attr;
1289     int                 init_flag;
1290     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1291     uint64_t            source_addr;
1292     SMSG_QUEUE         *queue = &smsg_queue;
1293
1294 #if CMK_SMP && !COMM_THREAD_SEND
1295     while(1)
1296     {
1297         CmiLock(tx_cq_lock);
1298         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1299         CmiUnlock(tx_cq_lock);
1300         if(status != GNI_RC_SUCCESS)
1301             break;
1302 #else
1303     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1304     {
1305 #endif
1306         inst_id = GNI_CQ_GET_INST_ID(event_data);
1307         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1308 #if PRINT_SYH
1309         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1310 #endif
1311         if (useDynamicSMSG) {
1312             /* subtle: smsg may come before connection is setup */
1313             while (smsg_connected_flag[inst_id] != 2) 
1314                PumpDatagramConnection();
1315         }
1316         msg_tag = GNI_SMSG_ANY_TAG;
1317 #if CMK_SMP && !COMM_THREAD_SEND
1318         while(1) {
1319             CmiLock(tx_cq_lock);
1320             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1321             CmiUnlock(tx_cq_lock);
1322             if (status != GNI_RC_SUCCESS)
1323                 break;
1324 #else
1325         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1326 #endif
1327             /* copy msg out and then put into queue (small message) */
1328             switch (msg_tag) {
1329             case SMALL_DATA_TAG:
1330             {
1331                 START_EVENT();
1332                 msg_nbytes = CmiGetMsgSize(header);
1333                 msg_data    = CmiAlloc(msg_nbytes);
1334                 memcpy(msg_data, (char*)header, msg_nbytes);
1335 #if CMK_SMP_TRACE_COMMTHREAD
1336                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1337 #endif
1338                 handleOneRecvedMsg(msg_nbytes, msg_data);
1339                 break;
1340             }
1341             case LMSG_INIT_TAG:
1342             {
1343 #if PRINT_SYH
1344                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1345 #endif
1346                 getLargeMsgRequest(header, inst_id);
1347                 break;
1348             }
1349             case ACK_TAG:   //msg fit into mempool
1350             {
1351                 /* Get is done, release message . Now put is not used yet*/
1352                 void *msg = (void*)(((CONTROL_MSG *)header)->source_addr);
1353 #if ! USE_LRTS_MEMPOOL
1354                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
1355 #else
1356                 DecreaseMsgInSend(msg);
1357 #endif
1358                 if(NoMsgInSend(msg))
1359                     buffered_send_msg -= GetMempoolsize(msg);
1360                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1361                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1362                 break;
1363             }
1364             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1365             {
1366                 header_tmp = (CONTROL_MSG *) header;
1367                 void *msg = (void*)(header_tmp->source_addr);
1368                 int cur_seq = CmiGetMsgSeq(msg);
1369                 int offset = ONE_SEG*(cur_seq+1);
1370                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh);
1371                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1372                 if (remain_size < 0) remain_size = 0;
1373                 CmiSetMsgSize(msg, remain_size);
1374                 if(remain_size <= 0) //transaction done
1375                 {
1376                     CmiFree(msg);
1377                 }else if (header_tmp->total_length > offset)
1378                 {
1379                     CmiSetMsgSeq(msg, cur_seq+1);
1380                     MallocControlMsg(control_msg_tmp);
1381                     control_msg_tmp->source_addr = header_tmp->source_addr;
1382                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1383                     control_msg_tmp->total_length   = header_tmp->total_length; 
1384                     control_msg_tmp->length         = header_tmp->total_length - offset;
1385                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1386                     control_msg_tmp->seq_id         = cur_seq+1+1;
1387                     //send next seg
1388                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1389                          // pipelining
1390                     if (header_tmp->seq_id == 1) {
1391                       int i;
1392                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1393                         int seq = cur_seq+i+2;
1394                         CmiSetMsgSeq(msg, seq-1);
1395                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1396                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1397                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1398                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1399                       }
1400                     }
1401                 }
1402                 break;
1403             }
1404 #if CMK_PERSISTENT_COMM
1405             case PUT_DONE_TAG: //persistent message
1406                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1407                 int size = ((CONTROL_MSG *) header)->length;
1408                 CmiReference(msg);
1409                 handleOneRecvedMsg(size, msg); 
1410                 break;
1411 #endif
1412 #ifdef CMK_DIRECT
1413             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1414                 (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1415                 break;
1416 #endif
1417             default: {
1418                 printf("weird tag problem\n");
1419                 CmiAbort("Unknown tag\n");
1420                      }
1421             }
1422 #if CMK_SMP && !COMM_THREAD_SEND
1423             CmiLock(tx_cq_lock);
1424 #endif
1425             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1426 #if CMK_SMP && !COMM_THREAD_SEND
1427             CmiUnlock(tx_cq_lock);
1428 #endif
1429             msg_tag = GNI_SMSG_ANY_TAG;
1430         } //endwhile getNext
1431     }   //end while GetEvent
1432     if(status == GNI_RC_ERROR_RESOURCE)
1433     {
1434         GNI_RC_CHECK("Smsg_rx_cq full", status);
1435     }
1436 }
1437
1438 static void printDesc(gni_post_descriptor_t *pd)
1439 {
1440     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1441 }
1442
1443 // for BIG_MSG called on receiver side for receiving control message
1444 // LMSG_INIT_TAG
1445 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1446 {
1447 #if     USE_LRTS_MEMPOOL
1448     CONTROL_MSG         *request_msg;
1449     gni_return_t        status = GNI_RC_SUCCESS;
1450     void                *msg_data;
1451     gni_post_descriptor_t *pd;
1452     gni_mem_handle_t    msg_mem_hndl;
1453     int source, size, transaction_size, offset = 0;
1454     int     register_size = 0;
1455
1456     // initial a get to transfer data from the sender side */
1457     request_msg = (CONTROL_MSG *) header;
1458     size = request_msg->total_length;
1459     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1460     if(request_msg->seq_id < 2)   {
1461         msg_data = CmiAlloc(size);
1462         CmiSetMsgSeq(msg_data, 0);
1463         _MEMCHECK(msg_data);
1464     }
1465     else {
1466         offset = ONE_SEG*(request_msg->seq_id-1);
1467         msg_data = (char*)request_msg->dest_addr + offset;
1468     }
1469    
1470     MallocPostDesc(pd);
1471     pd->cqwrite_value = request_msg->seq_id;
1472     if( request_msg->seq_id == 0)
1473     {
1474         pd->local_mem_hndl= GetMemHndl(msg_data);
1475         transaction_size = ALIGN64(size);
1476         if(IsMemHndlZero(pd->local_mem_hndl))
1477         {   
1478             status = registerMempool((void*)(msg_data));
1479             if(status == GNI_RC_SUCCESS)
1480             {
1481                 pd->local_mem_hndl = GetMemHndl(msg_data);
1482             }
1483             else
1484             {
1485                 SetMemHndlZero(pd->local_mem_hndl);
1486             }
1487         }
1488         if(NoMsgInRecv( (void*)(msg_data)))
1489             register_size = GetMempoolsize((void*)(msg_data));
1490         else
1491             register_size = 0;
1492     }
1493     else{
1494         transaction_size = ALIGN64(request_msg->length);
1495         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh);
1496         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1497         {
1498             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1499         }
1500     }
1501     pd->first_operand = ALIGN64(size);                   //  total length
1502
1503     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1504         pd->type            = GNI_POST_FMA_GET;
1505     else
1506         pd->type            = GNI_POST_RDMA_GET;
1507 #if REMOTE_EVENT
1508     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1509 #else
1510     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1511 #endif
1512     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1513     pd->length          = transaction_size;
1514     pd->local_addr      = (uint64_t) msg_data;
1515     pd->remote_addr     = request_msg->source_addr + offset;
1516     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1517     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1518     pd->rdma_mode       = 0;
1519     pd->amo_cmd         = 0;
1520
1521     //memory registration success
1522     if(status == GNI_RC_SUCCESS)
1523     {
1524 #if CMK_SMP && !COMM_THREAD_SEND
1525         CmiLock(tx_cq_lock);
1526 #endif
1527         if(pd->type == GNI_POST_RDMA_GET) 
1528             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1529         else
1530             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1531 #if CMK_SMP && !COMM_THREAD_SEND
1532         CmiUnlock(tx_cq_lock);
1533 #endif
1534          
1535         if(status == GNI_RC_SUCCESS )
1536         {
1537             if(pd->cqwrite_value == 0)
1538             {
1539 #if MACHINE_DEBUG_LOG
1540                 buffered_recv_msg += register_size;
1541                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1542 #endif
1543                 IncreaseMsgInRecv(msg_data);
1544
1545             }
1546         }
1547     }else
1548     {
1549         SetMemHndlZero(pd->local_mem_hndl);
1550     }
1551     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1552     {
1553         bufferRdmaMsg(inst_id, pd); 
1554     }else {
1555         /* printf("source: %d pd:%p\n", source, pd); */
1556         GNI_RC_CHECK("AFter posting", status);
1557     }
1558 #else
1559     CONTROL_MSG         *request_msg;
1560     gni_return_t        status;
1561     void                *msg_data;
1562     gni_post_descriptor_t *pd;
1563     RDMA_REQUEST        *rdma_request_msg;
1564     gni_mem_handle_t    msg_mem_hndl;
1565     //int source;
1566     // initial a get to transfer data from the sender side */
1567     request_msg = (CONTROL_MSG *) header;
1568     msg_data = CmiAlloc(request_msg->length);
1569     _MEMCHECK(msg_data);
1570
1571     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1572
1573     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1574     {
1575         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1576     }
1577
1578     MallocPostDesc(pd);
1579     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1580         pd->type            = GNI_POST_FMA_GET;
1581     else
1582         pd->type            = GNI_POST_RDMA_GET;
1583 #if REMOTE_EVENT
1584     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1585 #else
1586     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1587 #endif
1588     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1589     pd->length          = ALIGN64(request_msg->length);
1590     pd->local_addr      = (uint64_t) msg_data;
1591     pd->remote_addr     = request_msg->source_addr;
1592     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1593     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1594     pd->rdma_mode       = 0;
1595     pd->amo_cmd         = 0;
1596
1597     //memory registration successful
1598     if(status == GNI_RC_SUCCESS)
1599     {
1600         pd->local_mem_hndl  = msg_mem_hndl;
1601 #if CMK_SMP && !COMM_THREAD_SEND
1602             CmiLock(tx_cq_lock);
1603 #endif
1604         if(pd->type == GNI_POST_RDMA_GET) 
1605             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1606         else
1607             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1608 #if CMK_SMP && !COMM_THREAD_SEND
1609             CmiUnlock(tx_cq_lock);
1610 #endif
1611     }else
1612     {
1613         SetMemHndlZero(pd->local_mem_hndl);
1614     }
1615     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1616     {
1617         MallocRdmaRequest(rdma_request_msg);
1618         rdma_request_msg->next = 0;
1619         rdma_request_msg->destNode = inst_id;
1620         rdma_request_msg->pd = pd;
1621         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1622     }else {
1623         GNI_RC_CHECK("AFter posting", status);
1624     }
1625 #endif
1626 }
1627
1628 static void PumpLocalRdmaTransactions()
1629 {
1630     gni_cq_entry_t          ev;
1631     gni_return_t            status;
1632     uint64_t                type, inst_id;
1633     gni_post_descriptor_t   *tmp_pd;
1634     MSG_LIST                *ptr;
1635     CONTROL_MSG             *ack_msg_tmp;
1636     uint8_t             msg_tag;
1637 #ifdef CMK_DIRECT
1638     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1639 #endif
1640     SMSG_QUEUE         *queue = &smsg_queue;
1641
1642 #if CMK_SMP && !COMM_THREAD_SEND
1643     while(1) {
1644         CmiLock(tx_cq_lock);
1645         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1646         CmiUnlock(tx_cq_lock);
1647         if(status != GNI_RC_SUCCESS) break;
1648 #else
1649     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1650     {
1651 #endif
1652         type = GNI_CQ_GET_TYPE(ev);
1653         if (type == GNI_CQ_EVENT_TYPE_POST)
1654         {
1655             inst_id     = GNI_CQ_GET_INST_ID(ev);
1656 #if PRINT_SYH
1657             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1658 #endif
1659 #if CMK_SMP && !COMM_THREAD_SEND
1660             CmiLock(tx_cq_lock);
1661 #endif
1662             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1663 #if CMK_SMP && !COMM_THREAD_SEND
1664             CmiUnlock(tx_cq_lock);
1665 #endif
1666 #ifdef CMK_DIRECT
1667             if(tmp_pd->amo_cmd == 1)
1668             {
1669                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1670                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1671                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1672                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1673             }
1674             else{
1675                 MallocControlMsg(ack_msg_tmp);
1676                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1677                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1678             }
1679 #else
1680             MallocControlMsg(ack_msg_tmp);
1681             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1682             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1683 #endif
1684             ////Message is sent, free message , put is not used now
1685             switch (tmp_pd->type) {
1686 #if CMK_PERSISTENT_COMM
1687             case GNI_POST_RDMA_PUT:
1688 #if ! USE_LRTS_MEMPOOL
1689                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1690 #endif
1691             case GNI_POST_FMA_PUT:
1692                 CmiFree((void *)tmp_pd->local_addr);
1693                 msg_tag = PUT_DONE_TAG;
1694                 break;
1695 #endif
1696 #ifdef CMK_DIRECT
1697             case GNI_POST_RDMA_PUT:
1698             case GNI_POST_FMA_PUT:
1699                 //sender ACK to receiver to trigger it is done
1700                 msg_tag = DIRECT_PUT_DONE_TAG;
1701                 break;
1702 #endif
1703             case GNI_POST_RDMA_GET:
1704             case GNI_POST_FMA_GET:
1705 #if  ! USE_LRTS_MEMPOOL
1706                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1707                 msg_tag = ACK_TAG;  
1708 #else
1709                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1710                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1711                 {
1712                     msg_tag = BIG_MSG_TAG; 
1713                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1714                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1715                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1716                 } 
1717                 else
1718                 {
1719                     msg_tag = ACK_TAG;  
1720                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1721                 }
1722                 ack_msg_tmp->length = tmp_pd->length;
1723                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1724 #endif
1725                 break;
1726             default:
1727                 CmiPrintf("type=%d\n", tmp_pd->type);
1728                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1729             }
1730 #if CMK_DIRECT
1731             if(tmp_pd->amo_cmd == 1)
1732                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1733             else
1734 #endif
1735                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1736             if(status == GNI_RC_SUCCESS)
1737             {
1738 #if CMK_DIRECT
1739                 if(tmp_pd->amo_cmd == 1)
1740                     free(cmk_direct_done_msg); 
1741                 else
1742 #endif
1743                     FreeControlMsg(ack_msg_tmp);
1744
1745             }
1746 #if CMK_PERSISTENT_COMM
1747             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1748 #endif
1749             {
1750                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1751 #if PRINT_SYH
1752                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1753 #endif
1754 #if CMK_SMP_TRACE_COMMTHREAD
1755                     START_EVENT();
1756 #endif
1757                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1758                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1759 #if MACHINE_DEBUG_LOG
1760                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1761                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1762                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1763 #endif
1764 #if CMK_SMP_TRACE_COMMTHREAD
1765                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1766 #endif
1767                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1768                 }else if(msg_tag == BIG_MSG_TAG){
1769                     void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1770                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1771                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1772 #if CMK_SMP_TRACE_COMMTHREAD
1773                         START_EVENT();
1774 #endif
1775 #if PRINT_SYH
1776                         printf("Pipeline msg done [%d]\n", myrank);
1777 #endif
1778 #if CMK_SMP_TRACE_COMMTHREAD
1779                         TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1780 #endif
1781                         handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1782                     }
1783                 }
1784             }
1785             FreePostDesc(tmp_pd);
1786         }
1787     } //end while
1788     if(status == GNI_RC_ERROR_RESOURCE)
1789     {
1790         GNI_RC_CHECK("Smsg_tx_cq full", status);
1791     }
1792 }
1793
1794 static void  SendRdmaMsg()
1795 {
1796     gni_return_t            status = GNI_RC_SUCCESS;
1797     gni_mem_handle_t        msg_mem_hndl;
1798     RDMA_REQUEST *ptr = 0, *tmp_ptr;
1799     RDMA_REQUEST *pre = 0;
1800     int i, register_size = 0;
1801
1802 #if CMK_SMP
1803     int len = PCQueueLength(sendRdmaBuf);
1804     for (i=0; i<len; i++)
1805     {
1806         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1807         if (ptr == NULL) break;
1808 #else
1809     ptr = sendRdmaBuf;
1810     while (ptr!=0)
1811     {
1812 #endif 
1813         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool); 
1814         gni_post_descriptor_t *pd = ptr->pd;
1815         status = GNI_RC_SUCCESS;
1816         
1817         if(pd->cqwrite_value == 0)
1818         {
1819             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1820             {
1821                 status = registerMempool((void*)(pd->local_addr));
1822                 if(status == GNI_RC_SUCCESS)
1823                 {
1824                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1825                 }
1826             }else
1827             {
1828                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1829             }
1830             if(NoMsgInRecv( (void*)(pd->local_addr)))
1831                 register_size = GetMempoolsize((void*)(pd->local_addr));
1832             else
1833                 register_size = 0;
1834         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1835         {
1836             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1837         }
1838         if(status == GNI_RC_SUCCESS)        //mem register good
1839         {
1840 #if CMK_SMP && !COMM_THREAD_SEND
1841             CmiLock(tx_cq_lock);
1842 #endif
1843             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1844                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1845             else
1846                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1847 #if CMK_SMP && !COMM_THREAD_SEND
1848             CmiUnlock(tx_cq_lock);
1849 #endif
1850
1851             if(status == GNI_RC_SUCCESS)    //post good
1852             {
1853 #if !CMK_SMP
1854                 tmp_ptr = ptr;
1855                 if(pre != 0) {
1856                     pre->next = ptr->next;
1857                 }
1858                 else {
1859                     sendRdmaBuf = ptr->next;
1860                 }
1861                 ptr = ptr->next;
1862                 FreeRdmaRequest(tmp_ptr);
1863 #endif
1864                 if(pd->cqwrite_value == 0)
1865                 {
1866                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1867                 }
1868 #if MACHINE_DEBUG_LOG
1869                 buffered_recv_msg += register_size;
1870                 MACHSTATE(8, "GO request from buffered\n"); 
1871 #endif
1872             }else           // cannot post
1873             {
1874 #if CMK_SMP
1875                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1876 #else
1877                 pre = ptr;
1878                 ptr = ptr->next;
1879 #endif
1880                 break;
1881             }
1882         } else          //memory registration fails
1883         {
1884 #if CMK_SMP
1885             PCQueuePush(sendRdmaBuf, (char*)ptr);
1886 #else
1887             pre = ptr;
1888             ptr = ptr->next;
1889 #endif
1890         }
1891     } //end while
1892 #if ! CMK_SMP
1893     if(ptr == 0)
1894         sendRdmaTail = pre;
1895 #endif
1896 }
1897
1898 // return 1 if all messages are sent
1899 static int SendBufferMsg(SMSG_QUEUE *queue)
1900 {
1901     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1902     CONTROL_MSG         *control_msg_tmp;
1903     gni_return_t        status;
1904     int                 done = 1;
1905     int                 register_size;
1906     void                *register_addr;
1907     int                 index_previous = -1;
1908     int                 index = queue->smsg_head_index;
1909 #if CMI_EXERT_SEND_CAP
1910     int                 sent_cnt = 0;
1911 #endif
1912
1913 #if ! CMK_SMP
1914     index = queue->smsg_head_index;
1915 #else
1916     index = 0;
1917 #endif
1918 #if CMK_SMP
1919     while(index <mysize)
1920     {
1921         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1922         for (i=0; i<len; i++) 
1923         {
1924             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
1925             if (ptr == NULL) break;
1926 #else
1927     while(index != -1)
1928     {
1929         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
1930         pre = 0;
1931         while(ptr != 0)
1932         {
1933 #endif
1934             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
1935             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
1936                 /* connection not exists yet */
1937               done = 0;
1938               break;
1939             }
1940             status = GNI_RC_ERROR_RESOURCE;
1941             switch(ptr->tag)
1942             {
1943             case SMALL_DATA_TAG:
1944                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
1945                 if(status == GNI_RC_SUCCESS)
1946                 {
1947                     CmiFree(ptr->msg);
1948                 }
1949                 break;
1950             case LMSG_INIT_TAG:
1951                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1952                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
1953                 if(status != GNI_RC_SUCCESS)
1954                     done = 0;
1955                 break;
1956             case   ACK_TAG:
1957             case   BIG_MSG_TAG:
1958                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
1959                 if(status == GNI_RC_SUCCESS)
1960                 {
1961                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1962                 }else
1963                     done = 0;
1964                 break;
1965 #ifdef CMK_DIRECT
1966             case DIRECT_PUT_DONE_TAG:
1967                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
1968                 if(status == GNI_RC_SUCCESS)
1969                 {
1970                     free((CMK_DIRECT_HEADER*)ptr->msg);
1971                 }
1972                 break;
1973
1974 #endif
1975             default:
1976                 printf("Weird tag\n");
1977                 CmiAbort("should not happen\n");
1978             }
1979             if(status == GNI_RC_SUCCESS)
1980             {
1981 #if !CMK_SMP
1982                 tmp_ptr = ptr;
1983                 if(pre)
1984                 {
1985                     ptr = pre ->next = ptr->next;
1986                 }else
1987                 {
1988                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
1989                 }
1990                 FreeMsgList(tmp_ptr);
1991 #else
1992                 FreeMsgList(ptr);
1993 #endif
1994 #if PRINT_SYH
1995                 buffered_smsg_counter--;
1996                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
1997 #endif
1998 #if CMI_EXERT_SEND_CAP
1999                 sent_cnt++;
2000                 if(sent_cnt == SEND_CAP)
2001                     break;
2002 #endif
2003             }else {
2004 #if CMK_SMP
2005                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
2006 #else
2007                 pre = ptr;
2008                 ptr=ptr->next;
2009 #endif
2010                 done = 0;
2011                 if(status == GNI_RC_ERROR_RESOURCE)
2012                     break;
2013             } 
2014         } //end while
2015 #if !CMK_SMP
2016         if(ptr == 0)
2017             queue->smsg_msglist_index[index].tail = pre;
2018         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2019         {
2020             if(index_previous != -1)
2021                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2022             else
2023                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2024         }else
2025         {
2026             index_previous = index;
2027         }
2028         index = queue->smsg_msglist_index[index].next;
2029 #else
2030         index++;
2031 #endif
2032
2033 #if CMI_EXERT_SEND_CAP
2034         if(sent_cnt == SEND_CAP)
2035                 break;
2036 #endif
2037     }   // end pooling for all cores
2038     return done;
2039 }
2040
2041 void LrtsAdvanceCommunication(int whileidle)
2042 {
2043     /*  Receive Msg first */
2044     MACHSTATE(8, "calling advance comm \n") ; 
2045 #if CMK_SMP_TRACE_COMMTHREAD
2046     double startT, endT;
2047 #endif
2048     if (useDynamicSMSG && whileidle)
2049     {
2050 #if CMK_SMP_TRACE_COMMTHREAD
2051         startT = CmiWallTimer();
2052 #endif
2053         PumpDatagramConnection();
2054 #if CMK_SMP_TRACE_COMMTHREAD
2055         endT = CmiWallTimer();
2056         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2057 #endif
2058     }
2059
2060 #if CMK_SMP_TRACE_COMMTHREAD
2061     startT = CmiWallTimer();
2062 #endif
2063     PumpNetworkSmsg();
2064     MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2065 #if CMK_SMP_TRACE_COMMTHREAD
2066     endT = CmiWallTimer();
2067     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2068 #endif
2069
2070 #if CMK_SMP_TRACE_COMMTHREAD
2071     startT = CmiWallTimer();
2072 #endif
2073     PumpLocalRdmaTransactions();
2074     MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2075 #if CMK_SMP_TRACE_COMMTHREAD
2076     endT = CmiWallTimer();
2077     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2078 #endif
2079     /* Send buffered Message */
2080 #if CMK_SMP_TRACE_COMMTHREAD
2081     startT = CmiWallTimer();
2082 #endif
2083 #if USE_OOB
2084     SendBufferMsg(&smsg_oob_queue);
2085 #endif
2086     SendBufferMsg(&smsg_queue);
2087     MACHSTATE(8, "after SendBufferMsg\n") ; 
2088 #if CMK_SMP_TRACE_COMMTHREAD
2089     endT = CmiWallTimer();
2090     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2091 #endif
2092
2093 #if CMK_SMP_TRACE_COMMTHREAD
2094     startT = CmiWallTimer();
2095 #endif
2096     SendRdmaMsg();
2097     MACHSTATE(8, "after SendRdmaMsg\n") ; 
2098 #if CMK_SMP_TRACE_COMMTHREAD
2099     endT = CmiWallTimer();
2100     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2101 #endif
2102 }
2103
2104 /* useDynamicSMSG */
2105 static void _init_dynamic_smsg()
2106 {
2107     gni_return_t status;
2108     uint32_t     vmdh_index = -1;
2109     int i;
2110
2111     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2112     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2113     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2114     for(i=0; i<mysize; i++) {
2115         smsg_connected_flag[i] = 0;
2116         smsg_attr_vector_local[i] = NULL;
2117         smsg_attr_vector_remote[i] = NULL;
2118     }
2119     if(mysize <=512)
2120     {
2121         SMSG_MAX_MSG = 4096;
2122     }else if (mysize <= 4096)
2123     {
2124         SMSG_MAX_MSG = 4096/mysize * 1024;
2125     }else if (mysize <= 16384)
2126     {
2127         SMSG_MAX_MSG = 512;
2128     }else {
2129         SMSG_MAX_MSG = 256;
2130     }
2131
2132     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2133     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2134     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2135     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2136     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2137
2138     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2139     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2140     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2141     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2142     mailbox_list->offset = 0;
2143     mailbox_list->next = 0;
2144     
2145     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2146         mailbox_list->size, smsg_rx_cqh,
2147         GNI_MEM_READWRITE,   
2148         vmdh_index,
2149         &(mailbox_list->mem_hndl));
2150     GNI_RC_CHECK("MEMORY registration for smsg", status);
2151
2152     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2153     GNI_RC_CHECK("Unbound EP", status);
2154     
2155     alloc_smsg_attr(&send_smsg_attr);
2156
2157     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2158     GNI_RC_CHECK("post unbound datagram", status);
2159
2160       /* always pre-connect to proc 0 */
2161     //if (myrank != 0) connect_to(0);
2162 }
2163
2164 static void _init_static_smsg()
2165 {
2166     gni_smsg_attr_t      *smsg_attr;
2167     gni_smsg_attr_t      remote_smsg_attr;
2168     gni_smsg_attr_t      *smsg_attr_vec;
2169     gni_mem_handle_t     my_smsg_mdh_mailbox;
2170     int      ret, i;
2171     gni_return_t status;
2172     uint32_t              vmdh_index = -1;
2173     mdh_addr_t            base_infor;
2174     mdh_addr_t            *base_addr_vec;
2175     char *env;
2176
2177     if(mysize <=512)
2178     {
2179         SMSG_MAX_MSG = 1024;
2180     }else if (mysize <= 4096)
2181     {
2182         SMSG_MAX_MSG = 1024;
2183     }else if (mysize <= 16384)
2184     {
2185         SMSG_MAX_MSG = 512;
2186     }else {
2187         SMSG_MAX_MSG = 256;
2188     }
2189     
2190     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2191     if (env) SMSG_MAX_MSG = atoi(env);
2192     CmiAssert(SMSG_MAX_MSG > 0);
2193
2194     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2195     
2196     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2197     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2198     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2199     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2200     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2201     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2202     CmiAssert(ret == 0);
2203     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2204     //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG mailbox. \n", smsg_memlen*mysize/1e6);
2205     
2206     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2207             smsg_memlen*(mysize), smsg_rx_cqh,
2208             GNI_MEM_READWRITE,   
2209             vmdh_index,
2210             &my_smsg_mdh_mailbox);
2211
2212     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2213
2214     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2215     base_infor.mdh =  my_smsg_mdh_mailbox;
2216     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2217
2218     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2219  
2220     for(i=0; i<mysize; i++)
2221     {
2222         if(i==myrank)
2223             continue;
2224         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2225         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2226         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2227         smsg_attr[i].mbox_offset = i*smsg_memlen;
2228         smsg_attr[i].buff_size = smsg_memlen;
2229         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2230         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2231     }
2232
2233     for(i=0; i<mysize; i++)
2234     {
2235         if (myrank == i) continue;
2236
2237         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2238         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2239         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2240         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2241         remote_smsg_attr.buff_size = smsg_memlen;
2242         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2243         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2244
2245         /* initialize the smsg channel */
2246         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2247         GNI_RC_CHECK("SMSG Init", status);
2248     } //end initialization
2249
2250     free(base_addr_vec);
2251     free(smsg_attr);
2252
2253     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2254     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2255
2256
2257 inline
2258 static void _init_send_queue(SMSG_QUEUE *queue)
2259 {
2260      int i;
2261      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2262      for(i =0; i<mysize; i++)
2263      {
2264         queue->smsg_msglist_index[i].next = -1;
2265 #if CMK_SMP
2266         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2267 #else
2268         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2269 #endif
2270         
2271      }
2272      queue->smsg_head_index = -1;
2273 }
2274
2275 inline
2276 static void _init_smsg()
2277 {
2278     if(mysize > 1) {
2279         if (useDynamicSMSG)
2280             _init_dynamic_smsg();
2281         else
2282             _init_static_smsg();
2283     }
2284
2285     _init_send_queue(&smsg_queue);
2286 #if USE_OOB
2287     _init_send_queue(&smsg_oob_queue);
2288 #endif
2289 }
2290
2291 static void _init_static_msgq()
2292 {
2293     gni_return_t status;
2294     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2295     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2296     msgq_attrs.smsg_q_sz = 1;
2297     msgq_attrs.rcv_pool_sz = 1;
2298     msgq_attrs.num_msgq_eps = 2;
2299     msgq_attrs.nloc_insts = 8;
2300     msgq_attrs.modes = 0;
2301     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2302
2303     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2304     GNI_RC_CHECK("MSGQ Init", status);
2305
2306
2307 }
2308
2309 #if CMK_SMP && STEAL_MEMPOOL
2310 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2311 {
2312     void *pool = NULL;
2313     int i, k;
2314     // check other ranks
2315     for (k=0; k<CmiMyNodeSize()+1; k++) {
2316         i = (CmiMyRank()+k)%CmiMyNodeSize();
2317         if (i==CmiMyRank()) continue;
2318         mempool_type *mptr = CpvAccessOther(mempool, i);
2319         CmiLock(mptr->mempoolLock);
2320         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2321         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2322             CmiUnlock(mptr->mempoolLock);
2323             continue;
2324         }
2325         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2326         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2327             /* search in the free list */
2328           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2329           mempool_header *current = free_header;
2330           while (current) {
2331             if (current->next_free == (char*)header-(char*)mptr) break;
2332             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2333           }
2334           if (current == NULL) {         /* not found in free list */
2335             CmiUnlock(mptr->mempoolLock);
2336             continue;
2337           }
2338 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2339             /* search the previous memblock, and remove the tail */
2340           mempool_block *ptr = (mempool_block *)mptr;
2341           while (ptr) {
2342             if (ptr->memblock_next == mptr->memblock_tail) break;
2343             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2344           }
2345           CmiAssert(ptr!=NULL);
2346           ptr->memblock_next = 0;
2347           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2348
2349             /* remove memblock from the free list */
2350           current->next_free = header->next_free;
2351           if (header == free_header) mptr->freelist_head = header->next_free;
2352
2353           CmiUnlock(mptr->mempoolLock);
2354
2355           pool = (void*)tail;
2356           *mem_hndl = tail->mem_hndl;
2357           *size = tail->size;
2358           return pool;
2359         }
2360         CmiUnlock(mptr->mempoolLock);
2361     }
2362
2363       /* steal failed, deregister and free memblock now */
2364     int freed = 0;
2365     for (k=0; k<CmiMyNodeSize()+1; k++) {
2366         i = (CmiMyRank()+k)%CmiMyNodeSize();
2367         mempool_type *mptr = CpvAccessOther(mempool, i);
2368         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2369
2370         mempool_block *mempools_head = &(mptr->mempools_head);
2371         mempool_block *current = mempools_head;
2372         mempool_block *prev = NULL;
2373
2374         while (current) {
2375           int isfree = 0;
2376           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2377 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2378           mempool_header *cur = free_header;
2379           mempool_header *header;
2380           if (current != mempools_head) {
2381             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2382              /* search in free list */
2383             if (header->size == current->size - sizeof(mempool_block)) {
2384               cur = free_header;
2385               while (cur) {
2386                 if (cur->next_free == (char*)header-(char*)mptr) break;
2387                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2388               }
2389               if (cur != NULL) isfree = 1;
2390             }
2391           }
2392           if (isfree) {
2393               /* remove from free list */
2394             cur->next_free = header->next_free;
2395             if (header == free_header) mptr->freelist_head = header->next_free;
2396              // deregister
2397             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh);
2398             GNI_RC_CHECK("steal Mempool de-register", status);
2399             mempool_block *ptr = current;
2400             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2401             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2402 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2403             freed += ptr->size;
2404             free(ptr);
2405              // try now
2406             if (freed > *size) {
2407               if (pool == NULL) {
2408                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2409                 CmiAssert(ret == 0);
2410               }
2411               status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
2412               if (status == GNI_RC_SUCCESS) {
2413                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2414 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2415                 return pool;
2416               }
2417 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2418             }
2419           }
2420           else {
2421              prev = current;
2422              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2423           }
2424         }
2425
2426         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2427     }
2428       /* still no luck registering pool */
2429     if (pool) free(pool);
2430     return NULL;
2431 }
2432 #endif
2433
2434 static long long int total_mempool_size = 0;
2435 static long long int total_mempool_calls = 0;
2436
2437 #if USE_LRTS_MEMPOOL
2438 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2439 {
2440     void *pool;
2441     int ret;
2442
2443     int default_size =  expand_flag? _expand_mem : _mempool_size;
2444     if (*size < default_size) *size = default_size;
2445     total_mempool_size += *size;
2446     total_mempool_calls += 1;
2447     ret = posix_memalign(&pool, ALIGNBUF, *size);
2448     if (ret != 0) {
2449 #if CMK_SMP && STEAL_MEMPOOL
2450       pool = steal_mempool_block(size, mem_hndl);
2451       if (pool != NULL) return pool;
2452 #endif
2453       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2454       if (ret == ENOMEM)
2455         CmiAbort("alloc_mempool_block: out of memory.");
2456       else
2457         CmiAbort("alloc_mempool_block: posix_memalign failed");
2458     }
2459         SetMemHndlZero((*mem_hndl));
2460     
2461     return pool;
2462 }
2463
2464 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2465 {
2466     if(!(IsMemHndlZero(mem_hndl)))
2467     {
2468         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2469         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2470     }
2471     free(ptr);
2472 }
2473 #endif
2474
2475 void LrtsPreCommonInit(int everReturn){
2476 #if USE_LRTS_MEMPOOL
2477     CpvInitialize(mempool_type*, mempool);
2478     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2479     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2480 #endif
2481 }
2482
2483
2484 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2485 {
2486     register int            i;
2487     int                     rc;
2488     int                     device_id = 0;
2489     unsigned int            remote_addr;
2490     gni_cdm_handle_t        cdm_hndl;
2491     gni_return_t            status = GNI_RC_SUCCESS;
2492     uint32_t                vmdh_index = -1;
2493     uint8_t                 ptag;
2494     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2495     int                     first_spawned;
2496     int                     physicalID;
2497     char                   *env;
2498
2499     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2500     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2501     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2502    
2503     status = PMI_Init(&first_spawned);
2504     GNI_RC_CHECK("PMI_Init", status);
2505
2506     status = PMI_Get_size(&mysize);
2507     GNI_RC_CHECK("PMI_Getsize", status);
2508
2509     status = PMI_Get_rank(&myrank);
2510     GNI_RC_CHECK("PMI_getrank", status);
2511
2512     //physicalID = CmiPhysicalNodeID(myrank);
2513     
2514     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2515
2516     *myNodeID = myrank;
2517     *numNodes = mysize;
2518   
2519 #if !COMM_THREAD_SEND
2520     /* Currently, we only consider the case that comm. thread will only recv msgs */
2521     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2522 #endif
2523
2524     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2525     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2526     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2527
2528     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2529     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2530     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2531
2532     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
2533     if (env) useDynamicSMSG = 1;
2534     if (!useDynamicSMSG)
2535       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2536     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2537     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2538     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2539     
2540     if(myrank == 0)
2541     {
2542         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2543         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2544     }
2545 #ifdef USE_ONESIDED
2546     onesided_init(NULL, &onesided_hnd);
2547
2548     // this is a GNI test, so use the libonesided bypass functionality
2549     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2550     local_addr = gniGetNicAddress();
2551 #else
2552     ptag = get_ptag();
2553     cookie = get_cookie();
2554 #if 0
2555     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2556 #endif
2557     //Create and attach to the communication  domain */
2558     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2559     GNI_RC_CHECK("GNI_CdmCreate", status);
2560     //* device id The device id is the minor number for the device
2561     //that is assigned to the device by the system when the device is created.
2562     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2563     //where X is the device number 0 default 
2564     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2565     GNI_RC_CHECK("GNI_CdmAttach", status);
2566     local_addr = get_gni_nic_address(0);
2567 #endif
2568     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2569     _MEMCHECK(MPID_UGNI_AllAddr);
2570     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2571     /* create the local completion queue */
2572     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2573     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2574     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2575     
2576     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2577     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2578     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2579
2580     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2581     GNI_RC_CHECK("Create CQ (rx)", status);
2582     
2583     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2584     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2585     
2586     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2587     //GNI_RC_CHECK("Create BTE CQ", status);
2588
2589     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2590     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2591     _MEMCHECK(ep_hndl_array);
2592 #if CMK_SMP && !COMM_THREAD_SEND
2593     tx_cq_lock  = CmiCreateLock();
2594     rx_cq_lock  = CmiCreateLock();
2595 #endif
2596     for (i=0; i<mysize; i++) {
2597         if(i == myrank) continue;
2598         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2599         GNI_RC_CHECK("GNI_EpCreate ", status);   
2600         remote_addr = MPID_UGNI_AllAddr[i];
2601         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2602         GNI_RC_CHECK("GNI_EpBind ", status);   
2603     }
2604
2605     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2606     _init_smsg();
2607     PMI_Barrier();
2608
2609 #if     USE_LRTS_MEMPOOL
2610     env = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2611     if (env) _mempool_size = CmiReadSize(env);
2612     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&env,"Set the memory pool size")) 
2613         _mempool_size = CmiReadSize(env);
2614
2615     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2616
2617     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2618     if (env) {
2619         MAX_REG_MEM = CmiReadSize(env);
2620         if (myrank==0) printf("Charm++> memory pool maximum size: %1.fMB\n", MAX_REG_MEM/1024.0/1024);
2621     }
2622
2623     env = getenv("CHARM_UGNI_SEND_MAX");
2624     if (env) {
2625         MAX_BUFF_SEND = CmiReadSize(env);
2626         if (myrank==0) printf("Charm++> maximum pending memory pool for sending: %1.fMB\n", MAX_BUFF_SEND/1024.0/1024);
2627     }
2628
2629     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2630     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2631 #endif
2632
2633     /* init DMA buffer for medium message */
2634
2635     //_init_DMA_buffer();
2636     
2637     free(MPID_UGNI_AllAddr);
2638 #if CMK_SMP
2639     sendRdmaBuf = PCQueueCreate();
2640 #else
2641     sendRdmaBuf = 0;
2642 #endif
2643 #if MACHINE_DEBUG_LOG
2644     char ln[200];
2645     sprintf(ln,"debugLog.%d",myrank);
2646     debugLog=fopen(ln,"w");
2647 #endif
2648
2649 }
2650
2651 void* LrtsAlloc(int n_bytes, int header)
2652 {
2653     void *ptr;
2654 #if 0
2655     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2656 #endif
2657     if(n_bytes <= SMSG_MAX_MSG)
2658     {
2659         int totalsize = n_bytes+header;
2660         ptr = malloc(totalsize);
2661     }
2662     else {
2663         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2664 #if     USE_LRTS_MEMPOOL
2665         n_bytes = ALIGN64(n_bytes);
2666         if(n_bytes <= BIG_MSG)
2667         {
2668             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2669             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2670         }else 
2671         {
2672             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2673             ptr = res + ALIGNBUF - header;
2674
2675         }
2676 #else
2677         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2678         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2679         ptr = res + ALIGNBUF - header;
2680 #endif
2681     }
2682     return ptr;
2683 }
2684
2685 void  LrtsFree(void *msg)
2686 {
2687     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2688     if (size <= SMSG_MAX_MSG)
2689       free(msg);
2690     else if(size>BIG_MSG)
2691     {
2692         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2693
2694     }else
2695     {
2696 #if     USE_LRTS_MEMPOOL
2697 #if CMK_SMP
2698         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2699 #else
2700         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2701 #endif
2702 #else
2703         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2704 #endif
2705     }
2706 }
2707
2708 void LrtsExit()
2709 {
2710     /* free memory ? */
2711 #if USE_LRTS_MEMPOOL
2712     mempool_destroy(CpvAccess(mempool));
2713 #endif
2714     PMI_Finalize();
2715     exit(0);
2716 }
2717
2718 void LrtsDrainResources()
2719 {
2720     if(mysize == 1) return;
2721     while (
2722 #if USE_OOB
2723            !SendBufferMsg(&smsg_oob_queue) ||
2724 #endif
2725            !SendBufferMsg(&smsg_queue))
2726     {
2727         if (useDynamicSMSG)
2728             PumpDatagramConnection();
2729         PumpNetworkSmsg();
2730         PumpLocalRdmaTransactions();
2731         SendRdmaMsg();
2732     }
2733     PMI_Barrier();
2734 }
2735
2736 void LrtsAbort(const char *message) {
2737     printf("CmiAbort is calling on PE:%d\n", myrank);
2738     CmiPrintStackTrace(0);
2739     PMI_Abort(-1, message);
2740 }
2741
2742 /**************************  TIMER FUNCTIONS **************************/
2743 #if CMK_TIMER_USE_SPECIAL
2744 /* MPI calls are not threadsafe, even the timer on some machines */
2745 static CmiNodeLock  timerLock = 0;
2746 static int _absoluteTime = 0;
2747 static int _is_global = 0;
2748 static struct timespec start_ts;
2749
2750 inline int CmiTimerIsSynchronized() {
2751     return 0;
2752 }
2753
2754 inline int CmiTimerAbsolute() {
2755     return _absoluteTime;
2756 }
2757
2758 double CmiStartTimer() {
2759     return 0.0;
2760 }
2761
2762 double CmiInitTime() {
2763     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2764 }
2765
2766 void CmiTimerInit(char **argv) {
2767     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2768     if (_absoluteTime && CmiMyPe() == 0)
2769         printf("Charm++> absolute  timer is used\n");
2770     
2771     _is_global = CmiTimerIsSynchronized();
2772
2773
2774     if (_is_global) {
2775         if (CmiMyRank() == 0) {
2776             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2777         }
2778     } else { /* we don't have a synchronous timer, set our own start time */
2779         CmiBarrier();
2780         CmiBarrier();
2781         CmiBarrier();
2782         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2783     }
2784     CmiNodeAllBarrier();          /* for smp */
2785 }
2786
2787 /**
2788  * Since the timerLock is never created, and is
2789  * always NULL, then all the if-condition inside
2790  * the timer functions could be disabled right
2791  * now in the case of SMP.
2792  */
2793 double CmiTimer(void) {
2794     struct timespec now_ts;
2795     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2796     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2797         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2798 }
2799
2800 double CmiWallTimer(void) {
2801     struct timespec now_ts;
2802     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2803     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2804         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2805 }
2806
2807 double CmiCpuTimer(void) {
2808     struct timespec now_ts;
2809     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2810     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2811         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2812 }
2813
2814 #endif
2815 /************Barrier Related Functions****************/
2816
2817 int CmiBarrier()
2818 {
2819     int status;
2820
2821 #if CMK_SMP
2822     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2823     CmiNodeAllBarrier();
2824     if (CmiMyRank() == CmiMyNodeSize())
2825 #else
2826     if (CmiMyRank() == 0)
2827 #endif
2828     {
2829         /**
2830          *  The call of CmiBarrier is usually before the initialization
2831          *  of trace module of Charm++, therefore, the START_EVENT
2832          *  and END_EVENT are disabled here. -Chao Mei
2833          */
2834         /*START_EVENT();*/
2835         status = PMI_Barrier();
2836         GNI_RC_CHECK("PMI_Barrier", status);
2837         /*END_EVENT(10);*/
2838     }
2839     CmiNodeAllBarrier();
2840     return status;
2841
2842 }
2843 #if CMK_DIRECT
2844 #include "machine-cmidirect.c"
2845 #endif
2846 #if CMK_PERSISTENT_COMM
2847 #include "machine-persistent.c"
2848 #endif
2849
2850