speedup sendbuffersmsg sendrdma
[charm.git] / src / arch / gemini_gni / machine.c
1
2 /** @file
3  * Gemini GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
10
11     export CHARM_UGNI_MEMPOOL_SIZE=8M
12     export CHARM_UGNI_MEMPOOL_MAX=20M
13     export CHARM_UGNI_SEND_MAX=10M
14
15     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
16     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
17  */
18 /*@{*/
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <stdint.h>
23 #include <errno.h>
24 #include <malloc.h>
25 #include <unistd.h>
26
27 #include "gni_pub.h"
28 #include "pmi.h"
29
30 #include "converse.h"
31 #include <time.h>
32
33 #define USE_OOB                     0
34
35 #define PRINT_SYH  0
36
37 // Trace communication thread
38 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
39 #define TRACE_THRESHOLD     0.00005
40 #define CMI_MPI_TRACE_MOREDETAILED 0
41 #undef CMI_MPI_TRACE_USEREVENTS
42 #define CMI_MPI_TRACE_USEREVENTS 1
43 #else
44 #undef CMK_SMP_TRACE_COMMTHREAD
45 #define CMK_SMP_TRACE_COMMTHREAD 0
46 #endif
47
48 #define CMK_TRACE_COMMOVERHEAD 0
49 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
50 #undef CMI_MPI_TRACE_USEREVENTS
51 #define CMI_MPI_TRACE_USEREVENTS 1
52 #else
53 #undef CMK_TRACE_COMMOVERHEAD
54 #define CMK_TRACE_COMMOVERHEAD 0
55 #endif
56
57 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
58 CpvStaticDeclare(double, projTraceStart);
59 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
60 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
61 #else
62 #define  START_EVENT()
63 #define  END_EVENT(x)
64 #endif
65
66 #define CMI_EXERT_SEND_CAP      0
67 #define CMI_EXERT_RECV_CAP      0
68
69 #if CMI_EXERT_SEND_CAP
70 #define SEND_CAP 16
71 #endif
72
73 #if CMI_EXERT_RECV_CAP
74 #define RECV_CAP 2
75 #endif
76
77
78
79 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
80
81 static int useDynamicSMSG  =0;               /* dynamic smsgs setup */
82
83 static int avg_smsg_connection = 32;
84 static int                 *smsg_connected_flag= 0;
85 static gni_smsg_attr_t     **smsg_attr_vector_local;
86 static gni_smsg_attr_t     **smsg_attr_vector_remote;
87 static gni_ep_handle_t     ep_hndl_unbound;
88 static gni_smsg_attr_t     send_smsg_attr;
89 static gni_smsg_attr_t     recv_smsg_attr;
90
91 typedef struct _dynamic_smsg_mailbox{
92    void     *mailbox_base;
93    int      size;
94    int      offset;
95    gni_mem_handle_t  mem_hndl;
96    struct      _dynamic_smsg_mailbox  *next;
97 }dynamic_smsg_mailbox_t;
98
99 static dynamic_smsg_mailbox_t  *mailbox_list;
100
101 #define REMOTE_EVENT                      0
102 #define USE_LRTS_MEMPOOL                  1
103
104 #if USE_LRTS_MEMPOOL
105 #if CMK_SMP
106 #define STEAL_MEMPOOL                     0
107 #endif
108
109 #define oneMB (1024ll*1024)
110 #if CMK_SMP
111 static CmiInt8 _mempool_size = 8*oneMB;
112 #else
113 static CmiInt8 _mempool_size = 32*oneMB;
114 #endif
115 static CmiInt8 _expand_mem =  4*oneMB;
116 #endif
117
118 //Dynamic flow control about memory registration
119 static CmiInt8  MAX_BUFF_SEND  =  2*oneMB*oneMB;
120 static CmiInt8  MAX_REG_MEM    =  2*oneMB*oneMB;
121 static CmiInt8 buffered_send_msg = 0;
122
123 #define BIG_MSG                  16*oneMB
124 #define ONE_SEG                  4*oneMB
125 #define BIG_MSG_PIPELINE         4
126
127 #if CMK_SMP
128 #define COMM_THREAD_SEND 1
129 #endif
130 int         rdma_id = 0;
131 #if PRINT_SYH
132 int         lrts_smsg_success = 0;
133 int         lrts_send_msg_id = 0;
134 int         lrts_send_rdma_success = 0;
135 int         lrts_local_done_msg = 0;
136
137 #endif
138 #include "machine.h"
139
140 #include "pcqueue.h"
141
142 #include "mempool.h"
143
144 #if CMK_PERSISTENT_COMM
145 #include "machine-persistent.h"
146 #endif
147
148 //#define  USE_ONESIDED 1
149 #ifdef USE_ONESIDED
150 //onesided implementation is wrong, since no place to restore omdh
151 #include "onesided.h"
152 onesided_hnd_t   onesided_hnd;
153 onesided_md_t    omdh;
154 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
155
156 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
157
158 #else
159 uint8_t   onesided_hnd, omdh;
160 #if REMOTE_EVENT
161 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl)
162 #else
163 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl)
164 #endif
165 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
166 #endif
167 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
168 #define   DecreaseMsgInRecv(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)--
169 #define   IncreaseMsgInSend(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)++
170                                    //printf("++++[%d]%p   ----- %d\n", myrank, ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
171 #define   DecreaseMsgInSend(x)   (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send)--
172                                      //printf("---[%d]%p   ----- %d\n", myrank,((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr)), (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send ));
173 #define   GetMempooladdr(x)  ((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr
174 #define   GetMempoolptr(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mptr
175 #define   GetMempoolsize(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->size
176 #define   GetMemHndl(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->mem_hndl
177 #define   GetMemHndlFromHeader(x) ((block_header*)x)->mem_hndl
178 #define   GetSizeFromHeader(x) ((block_header*)x)->size
179 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
180 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
181 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
182 #define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
183 #define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
184 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
185
186 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
187 #define CmiSetMsgSize(m,s)  ((((CmiMsgHeaderExt*)m)->size)=(s))
188 #define CmiGetMsgSeq(m)  ((CmiMsgHeaderExt*)m)->seq
189 #define CmiSetMsgSeq(m, s)  ((((CmiMsgHeaderExt*)m)->seq) = (s))
190
191 #define ALIGNBUF                64
192
193 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
194 /* If SMSG is not used */
195
196 #define FMA_PER_CORE  1024
197 #define FMA_BUFFER_SIZE 1024
198
199 /* If SMSG is used */
200 static int  SMSG_MAX_MSG = 1024;
201 #define SMSG_MAX_CREDIT 72 
202
203 #define MSGQ_MAXSIZE       2048
204 /* large message transfer with FMA or BTE */
205 #define LRTS_GNI_RDMA_THRESHOLD  1024 
206
207 #if CMK_SMP
208 static int  REMOTE_QUEUE_ENTRIES=163840; 
209 static int LOCAL_QUEUE_ENTRIES=163840; 
210 #else
211 static int  REMOTE_QUEUE_ENTRIES=20480;
212 static int LOCAL_QUEUE_ENTRIES=20480; 
213 #endif
214
215 typedef enum send_gni_return
216 {
217     SEND_GNI_SUCCESS=10,
218     SEND_GNI_NO_RESOURCE,
219     SEND_GNI_NO_MEM
220 }send_gni_return_t;
221
222 #define BIG_MSG_TAG  0x26
223 #define PUT_DONE_TAG      0x28
224 #define DIRECT_PUT_DONE_TAG      0x29
225 #define ACK_TAG           0x30
226 /* SMSG is data message */
227 #define SMALL_DATA_TAG          0x31
228 /* SMSG is a control message to initialize a BTE */
229 #define MEDIUM_HEAD_TAG         0x32
230 #define MEDIUM_DATA_TAG         0x33
231 #define LMSG_INIT_TAG           0x39 
232 #define VERY_LMSG_INIT_TAG      0x40 
233 #define VERY_LMSG_TAG           0x41 
234
235 #define DEBUG
236 #ifdef GNI_RC_CHECK
237 #undef GNI_RC_CHECK
238 #endif
239 #ifdef DEBUG
240 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
241 #else
242 #define GNI_RC_CHECK(msg,rc)
243 #endif
244
245 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
246 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
247
248 static int useStaticMSGQ = 0;
249 static int useStaticFMA = 0;
250 static int mysize, myrank;
251 gni_nic_handle_t      nic_hndl;
252
253 typedef struct {
254     gni_mem_handle_t mdh;
255     uint64_t addr;
256 } mdh_addr_t ;
257 // this is related to dynamic SMSG
258
259 typedef struct mdh_addr_list{
260     gni_mem_handle_t mdh;
261    void *addr;
262     struct mdh_addr_list *next;
263 }mdh_addr_list_t;
264
265 static unsigned int         smsg_memlen;
266 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
267 mdh_addr_t          setup_mem;
268 mdh_addr_t          *smsg_connection_vec = 0;
269 gni_mem_handle_t    smsg_connection_memhndl;
270 static int          smsg_expand_slots = 10;
271 static int          smsg_available_slot = 0;
272 static void         *smsg_mailbox_mempool = 0;
273 mdh_addr_list_t     *smsg_dynamic_list = 0;
274
275 static void             *smsg_mailbox_base;
276 gni_msgq_attr_t         msgq_attrs;
277 gni_msgq_handle_t       msgq_handle;
278 gni_msgq_ep_attr_t      msgq_ep_attrs;
279 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
280
281 /* =====Beginning of Declarations of Machine Specific Variables===== */
282 static int cookie;
283 static int modes = 0;
284 static gni_cq_handle_t       smsg_rx_cqh = NULL;
285 static gni_cq_handle_t       smsg_tx_cqh = NULL;
286 static gni_cq_handle_t       post_rx_cqh = NULL;
287 static gni_cq_handle_t       post_tx_cqh = NULL;
288 static gni_ep_handle_t       *ep_hndl_array;
289 #if CMK_SMP && !COMM_THREAD_SEND
290 static CmiNodeLock           *ep_lock_array;
291 static CmiNodeLock           tx_cq_lock; 
292 static CmiNodeLock           rx_cq_lock; 
293 #endif
294 typedef struct msg_list
295 {
296     uint32_t destNode;
297     uint32_t size;
298     void *msg;
299     uint8_t tag;
300 #if !CMK_SMP
301     struct msg_list *next;
302 #endif
303 }MSG_LIST;
304
305
306 typedef struct control_msg
307 {
308     uint64_t            source_addr;    /* address from the start of buffer  */
309     uint64_t            dest_addr;      /* address from the start of buffer */
310     //int                 source;         /* source rank */
311     int                 total_length;   /* total length */
312     int                 length;         /* length of this packet */
313     uint8_t             seq_id;         //big message   0 meaning single message
314     gni_mem_handle_t    source_mem_hndl;
315     struct control_msg *next;
316 }CONTROL_MSG;
317
318 #ifdef CMK_DIRECT
319 typedef struct{
320     void *recverBuf;
321     void (*callbackFnPtr)(void *);
322     void *callbackData;
323 }CMK_DIRECT_HEADER;
324 #endif
325 typedef struct  rmda_msg
326 {
327     int                   destNode;
328     gni_post_descriptor_t *pd;
329 #if !CMK_SMP
330     struct  rmda_msg      *next;
331 #endif
332 }RDMA_REQUEST;
333
334 #if CMK_SMP
335 PCQueue sendRdmaBuf;
336 typedef struct  msg_list_index
337 {
338     int         next;
339     PCQueue     sendSmsgBuf;
340 } MSG_LIST_INDEX;
341 #else
342 static RDMA_REQUEST        *sendRdmaBuf = 0;
343 static RDMA_REQUEST        *sendRdmaTail = 0;
344 typedef struct  msg_list_index
345 {
346     int         next;
347     MSG_LIST    *sendSmsgBuf;
348     MSG_LIST    *tail;
349 } MSG_LIST_INDEX;
350 #endif
351
352 /* reuse PendingMsg memory */
353 static CONTROL_MSG          *control_freelist=0;
354 static MSG_LIST             *msglist_freelist=0;
355
356 typedef struct smsg_queue
357 {
358     MSG_LIST_INDEX   *smsg_msglist_index;
359     int               smsg_head_index;
360 } SMSG_QUEUE;
361
362 SMSG_QUEUE                  smsg_queue;
363 SMSG_QUEUE                  smsg_oob_queue;
364
365 #if CMK_SMP
366
367 #define FreeMsgList(d)   free(d);
368 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
369
370 #else
371
372 #define FreeMsgList(d)  \
373   (d)->next = msglist_freelist;\
374   msglist_freelist = d;
375
376 #define MallocMsgList(d) \
377   d = msglist_freelist;\
378   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
379              _MEMCHECK(d);\
380   } else msglist_freelist = d->next; \
381   d->next =0;
382 #endif
383
384 #if CMK_SMP
385
386 #define FreeControlMsg(d)      free(d);
387 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
388
389 #else
390
391 #define FreeControlMsg(d)       \
392   (d)->next = control_freelist;\
393   control_freelist = d;
394
395 #define MallocControlMsg(d) \
396   d = control_freelist;\
397   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
398              _MEMCHECK(d);\
399   } else control_freelist = d->next;
400
401 #endif
402
403 static RDMA_REQUEST         *rdma_freelist = NULL;
404
405 #define FreeMediumControlMsg(d)       \
406   (d)->next = medium_control_freelist;\
407   medium_control_freelist = d;
408
409
410 #define MallocMediumControlMsg(d) \
411     d = medium_control_freelist;\
412     if (d==0) {d = ((MEDIUM_MSG_CONTROL*)malloc(sizeof(MEDIUM_MSG_CONTROL)));\
413     _MEMCHECK(d);\
414 } else mediumcontrol_freelist = d->next;
415
416 # if CMK_SMP
417 #define FreeRdmaRequest(d)       free(d);
418 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
419 #else
420
421 #define FreeRdmaRequest(d)       \
422   (d)->next = rdma_freelist;\
423   rdma_freelist = d;
424
425 #define MallocRdmaRequest(d) \
426   d = rdma_freelist;\
427   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
428              _MEMCHECK(d);\
429   } else rdma_freelist = d->next; \
430     d->next =0;
431 #endif
432
433 /* reuse gni_post_descriptor_t */
434 static gni_post_descriptor_t *post_freelist=0;
435
436 #if  !CMK_SMP
437 #define FreePostDesc(d)       \
438     (d)->next_descr = post_freelist;\
439     post_freelist = d;
440
441 #define MallocPostDesc(d) \
442   d = post_freelist;\
443   if (d==0) { \
444      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
445      _MEMCHECK(d);\
446   } else post_freelist = d->next_descr;
447 #else
448
449 #define FreePostDesc(d)     free(d);
450 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
451
452 #endif
453
454
455 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
456 static int      buffered_smsg_counter = 0;
457
458 /* SmsgSend return success but message sent is not confirmed by remote side */
459 static MSG_LIST *buffered_fma_head = 0;
460 static MSG_LIST *buffered_fma_tail = 0;
461
462 /* functions  */
463 #define IsFree(a,ind)  !( a& (1<<(ind) ))
464 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
465 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
466
467 CpvDeclare(mempool_type*, mempool);
468
469 /* get the upper bound of log 2 */
470 int mylog2(int size)
471 {
472     int op = size;
473     unsigned int ret=0;
474     unsigned int mask = 0;
475     int i;
476     while(op>0)
477     {
478         op = op >> 1;
479         ret++;
480
481     }
482     for(i=1; i<ret; i++)
483     {
484         mask = mask << 1;
485         mask +=1;
486     }
487
488     ret -= ((size &mask) ? 0:1);
489     return ret;
490 }
491
492 static void
493 allgather(void *in,void *out, int len)
494 {
495     static int *ivec_ptr=NULL,already_called=0,job_size=0;
496     int i,rc;
497     int my_rank;
498     char *tmp_buf,*out_ptr;
499
500     if(!already_called) {
501
502         rc = PMI_Get_size(&job_size);
503         CmiAssert(rc == PMI_SUCCESS);
504         rc = PMI_Get_rank(&my_rank);
505         CmiAssert(rc == PMI_SUCCESS);
506
507         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
508         CmiAssert(ivec_ptr != NULL);
509
510         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
511         CmiAssert(rc == PMI_SUCCESS);
512
513         already_called = 1;
514
515     }
516
517     tmp_buf = (char *)malloc(job_size * len);
518     CmiAssert(tmp_buf);
519
520     rc = PMI_Allgather(in,tmp_buf,len);
521     CmiAssert(rc == PMI_SUCCESS);
522
523     out_ptr = out;
524
525     for(i=0;i<job_size;i++) {
526
527         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
528
529     }
530
531     free(tmp_buf);
532 }
533
534 static void
535 allgather_2(void *in,void *out, int len)
536 {
537     //PMI_Allgather is out of order
538     int i,rc, extend_len;
539     int  rank_index;
540     char *out_ptr, *out_ref;
541     char *in2;
542
543     extend_len = sizeof(int) + len;
544     in2 = (char*)malloc(extend_len);
545
546     memcpy(in2, &myrank, sizeof(int));
547     memcpy(in2+sizeof(int), in, len);
548
549     out_ptr = (char*)malloc(mysize*extend_len);
550
551     rc = PMI_Allgather(in2, out_ptr, extend_len);
552     GNI_RC_CHECK("allgather", rc);
553
554     out_ref = out;
555
556     for(i=0;i<mysize;i++) {
557         //rank index 
558         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
559         //copy to the rank index slot
560         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
561     }
562
563     free(out_ptr);
564     free(in2);
565
566 }
567
568 static unsigned int get_gni_nic_address(int device_id)
569 {
570     unsigned int address, cpu_id;
571     gni_return_t status;
572     int i, alps_dev_id=-1,alps_address=-1;
573     char *token, *p_ptr;
574
575     p_ptr = getenv("PMI_GNI_DEV_ID");
576     if (!p_ptr) {
577         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
578        
579         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
580     } else {
581         while ((token = strtok(p_ptr,":")) != NULL) {
582             alps_dev_id = atoi(token);
583             if (alps_dev_id == device_id) {
584                 break;
585             }
586             p_ptr = NULL;
587         }
588         CmiAssert(alps_dev_id != -1);
589         p_ptr = getenv("PMI_GNI_LOC_ADDR");
590         CmiAssert(p_ptr != NULL);
591         i = 0;
592         while ((token = strtok(p_ptr,":")) != NULL) {
593             if (i == alps_dev_id) {
594                 alps_address = atoi(token);
595                 break;
596             }
597             p_ptr = NULL;
598             ++i;
599         }
600         CmiAssert(alps_address != -1);
601         address = alps_address;
602     }
603     return address;
604 }
605
606 static uint8_t get_ptag(void)
607 {
608     char *p_ptr, *token;
609     uint8_t ptag;
610
611     p_ptr = getenv("PMI_GNI_PTAG");
612     CmiAssert(p_ptr != NULL);
613     token = strtok(p_ptr, ":");
614     ptag = (uint8_t)atoi(token);
615     return ptag;
616         
617 }
618
619 static uint32_t get_cookie(void)
620 {
621     uint32_t cookie;
622     char *p_ptr, *token;
623
624     p_ptr = getenv("PMI_GNI_COOKIE");
625     CmiAssert(p_ptr != NULL);
626     token = strtok(p_ptr, ":");
627     cookie = (uint32_t)atoi(token);
628
629     return cookie;
630 }
631
632 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
633 /* TODO: add any that are related */
634 /* =====End of Definitions of Message-Corruption Related Macros=====*/
635
636
637 #include "machine-lrts.h"
638 #include "machine-common-core.c"
639
640 /* Network progress function is used to poll the network when for
641    messages. This flushes receive buffers on some  implementations*/
642 #if CMK_MACHINE_PROGRESS_DEFINED
643 void CmiMachineProgressImpl() {
644 }
645 #endif
646
647 static void SendRdmaMsg();
648 static void PumpNetworkSmsg();
649 static void PumpLocalRdmaTransactions();
650 static int SendBufferMsg();
651 static     int register_mempool = 0;
652
653 #if MACHINE_DEBUG_LOG
654 FILE *debugLog = NULL;
655 static CmiInt8 buffered_recv_msg = 0;
656 int         lrts_smsg_success = 0;
657 int         lrts_received_msg = 0;
658 #endif
659 static void sweep_mempool(mempool_type *mptr)
660 {
661     block_header *current = &(mptr->block_head);
662
663 printf("[n %d] sweep_mempool slot START .\n", myrank);
664     while( current!= NULL) {
665 printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
666         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
667     }
668 printf("[n %d] sweep_mempool slot END .\n", myrank);
669 }
670
671 inline
672 static  gni_return_t deregisterMempool(mempool_type *mptr, block_header **from)
673 {
674     gni_return_t status;
675     block_header *current = *from;
676
677     while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
678        current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
679
680     *from = current;
681     if(current == NULL) return GNI_RC_ERROR_RESOURCE;
682     status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh);
683     GNI_RC_CHECK("registerMemorypool de-register", status);
684     register_mempool -= GetSizeFromHeader(current);
685     SetMemHndlZero(GetMemHndlFromHeader(current));
686     return GNI_RC_SUCCESS;
687 }
688
689 inline 
690 static gni_return_t registerMempool(void *msg)
691 {
692     gni_return_t status = GNI_RC_SUCCESS;
693     int size = GetMempoolsize(msg);
694     void *addr = GetMempooladdr(msg);
695     gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
696    
697     mempool_type *mptr = (mempool_type*)GetMempoolptr(msg);
698     block_header *current = &(mptr->block_head);
699
700     while(register_mempool > MAX_REG_MEM)
701     {
702         status = deregisterMempool(mptr, &current);
703         if (status != GNI_RC_SUCCESS) break;
704     };
705     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_mempool); 
706     while(1)
707     {
708         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, addr, size, memhndl, &omdh);
709         if(status == GNI_RC_SUCCESS)
710         {
711             register_mempool += size;
712             break;
713         }
714         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
715         {
716             CmiAbort("Memory registor for mempool fails\n");
717         }
718         else
719         {
720             status = deregisterMempool(mptr, &current);
721             if (status != GNI_RC_SUCCESS) break;
722         }
723     }; 
724     return status;
725 }
726
727 inline
728 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
729 {
730     MSG_LIST        *msg_tmp;
731     MallocMsgList(msg_tmp);
732     msg_tmp->destNode = destNode;
733     msg_tmp->size   = size;
734     msg_tmp->msg    = msg;
735     msg_tmp->tag    = tag;
736
737 #if !CMK_SMP
738     if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
739         queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
740         queue->smsg_head_index = destNode;
741         queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
742     }else
743     {
744         queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
745         queue->smsg_msglist_index[destNode].tail = msg_tmp;
746     }
747 #else
748     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
749 #endif
750 #if PRINT_SYH
751     buffered_smsg_counter++;
752 #endif
753 }
754
755 inline static void print_smsg_attr(gni_smsg_attr_t     *a)
756 {
757     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
758 }
759
760 inline
761 static void setup_smsg_connection(int destNode)
762 {
763     mdh_addr_list_t  *new_entry = 0;
764     gni_post_descriptor_t *pd;
765     gni_smsg_attr_t      *smsg_attr;
766     gni_return_t status = GNI_RC_NOT_DONE;
767     RDMA_REQUEST        *rdma_request_msg;
768     
769     if(smsg_available_slot == smsg_expand_slots)
770     {
771         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
772         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
773         bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
774
775         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
776             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
777             GNI_MEM_READWRITE,   
778             -1,
779             &(new_entry->mdh));
780         smsg_available_slot = 0; 
781         new_entry->next = smsg_dynamic_list;
782         smsg_dynamic_list = new_entry;
783     }
784     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
785     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
786     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
787     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
788     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
789     smsg_attr->buff_size = smsg_memlen;
790     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
791     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
792     smsg_local_attr_vec[destNode] = smsg_attr;
793     smsg_available_slot++;
794     MallocPostDesc(pd);
795     pd->type            = GNI_POST_FMA_PUT;
796     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
797     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
798     pd->length          = sizeof(gni_smsg_attr_t);
799     pd->local_addr      = (uint64_t) smsg_attr;
800     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
801     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
802     pd->src_cq_hndl     = 0;
803     pd->rdma_mode       = 0;
804     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
805     print_smsg_attr(smsg_attr);
806     if(status == GNI_RC_ERROR_RESOURCE )
807     {
808         MallocRdmaRequest(rdma_request_msg);
809         rdma_request_msg->destNode = destNode;
810         rdma_request_msg->pd = pd;
811         /* buffer this request */
812     }
813 #if PRINT_SYH
814     if(status != GNI_RC_SUCCESS)
815        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
816     else
817         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
818 #endif
819 }
820
821 /* useDynamicSMSG */
822 inline 
823 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
824 {
825     gni_return_t status = GNI_RC_NOT_DONE;
826
827     if(mailbox_list->offset == mailbox_list->size)
828     {
829         dynamic_smsg_mailbox_t *new_mailbox_entry;
830         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
831         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
832         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
833         bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
834         new_mailbox_entry->offset = 0;
835         
836         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
837             new_mailbox_entry->size, smsg_rx_cqh,
838             GNI_MEM_READWRITE,   
839             -1,
840             &(new_mailbox_entry->mem_hndl));
841
842         GNI_RC_CHECK("register", status);
843         new_mailbox_entry->next = mailbox_list;
844         mailbox_list = new_mailbox_entry;
845     }
846     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
847     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
848     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
849     local_smsg_attr->mbox_offset = mailbox_list->offset;
850     mailbox_list->offset += smsg_memlen;
851     local_smsg_attr->buff_size = smsg_memlen;
852     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
853     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
854 }
855
856 /* useDynamicSMSG */
857 inline 
858 static int connect_to(int destNode)
859 {
860     gni_return_t status = GNI_RC_NOT_DONE;
861     CmiAssert(smsg_connected_flag[destNode] == 0);
862     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
863     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
864     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
865     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
866             
867 #if CMK_SMP && !COMM_THREAD_SEND
868     //CmiLock(ep_lock_array[destNode]);
869     CmiLock(tx_cq_lock);
870 #endif
871     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
872 #if CMK_SMP && !COMM_THREAD_SEND
873     //CmiUnlock(ep_lock_array[destNode]);
874     CmiUnlock(tx_cq_lock);
875 #endif
876     if (status == GNI_RC_ERROR_RESOURCE) {
877       /* possibly destNode is making connection at the same time */
878       free(smsg_attr_vector_local[destNode]);
879       smsg_attr_vector_local[destNode] = NULL;
880       free(smsg_attr_vector_remote[destNode]);
881       smsg_attr_vector_remote[destNode] = NULL;
882       mailbox_list->offset -= smsg_memlen;
883       return 0;
884     }
885     GNI_RC_CHECK("GNI_Post", status);
886     smsg_connected_flag[destNode] = 1;
887     return 1;
888 }
889
890 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
891 inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
892 {
893     unsigned int          remote_address;
894     uint32_t              remote_id;
895     gni_return_t status = GNI_RC_NOT_DONE;
896     gni_smsg_attr_t      *smsg_attr;
897     gni_post_descriptor_t *pd;
898     gni_post_state_t      post_state;
899     char                  *real_data; 
900
901     if (useDynamicSMSG) {
902         switch (smsg_connected_flag[destNode]) {
903         case 0: 
904             connect_to(destNode);         /* continue to case 1 */
905         case 1:                           /* pending connection, do nothing */
906             status = GNI_RC_NOT_DONE;
907             if(inbuff ==0)
908                 buffer_small_msgs(queue, msg, size, destNode, tag);
909             return status;
910         }
911     }
912 #if CMK_SMP
913     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
914     {
915 #else
916     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
917     {
918 #endif
919 #if CMK_SMP && !COMM_THREAD_SEND
920         CmiLock(tx_cq_lock);
921 #endif
922         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
923 #if CMK_SMP && !COMM_THREAD_SEND
924         CmiUnlock(tx_cq_lock);
925 #endif
926         if(status == GNI_RC_SUCCESS)
927         {
928 #if CMK_SMP_TRACE_COMMTHREAD
929             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
930             { 
931                 START_EVENT();
932                 if ( tag == SMALL_DATA_TAG)
933                     real_data = (char*)msg; 
934                 else 
935                     real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
936                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
937             }
938 #endif
939             return status;
940         }
941     }
942     if(inbuff ==0)
943         buffer_small_msgs(queue, msg, size, destNode, tag);
944     return status;
945 }
946
947
948 inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
949 {
950     /* construct a control message and send */
951     CONTROL_MSG         *control_msg_tmp;
952     MallocControlMsg(control_msg_tmp);
953     control_msg_tmp->source_addr = (uint64_t)msg;
954     control_msg_tmp->seq_id    = seqno;
955     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
956 #if     USE_LRTS_MEMPOOL
957     if(size < BIG_MSG)
958     {
959         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
960     }
961     else
962     {
963         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
964         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
965         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
966     }
967 #else
968     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
969 #endif
970     return control_msg_tmp;
971 }
972
973 // Large message, send control to receiver, receiver register memory and do a GET, 
974 // return 1 - send no success
975 inline
976 static send_gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
977 {
978     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
979     send_gni_return_t   send_status = SEND_GNI_NO_MEM;
980     uint32_t            vmdh_index  = -1;
981     int                 size;
982     int                 offset = 0;
983     uint64_t            source_addr;
984     int                 register_size; 
985     size    =   control_msg_tmp->total_length;
986     source_addr = control_msg_tmp->source_addr;
987     register_size = control_msg_tmp->length;
988
989 #if     USE_LRTS_MEMPOOL
990     if( control_msg_tmp->seq_id == 0 ){
991         if(buffered_send_msg >= MAX_BUFF_SEND)
992         {
993             if(!inbuff)
994                 buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
995             return SEND_GNI_NO_MEM;
996         }
997         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
998         {
999             //register the corresponding mempool
1000             status = registerMempool((void*)source_addr);
1001             if(status == GNI_RC_SUCCESS)
1002             {
1003                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1004             }
1005         }else
1006         {
1007             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1008             status = GNI_RC_SUCCESS;
1009         }
1010         if(NoMsgInSend( (void*)(control_msg_tmp->source_addr)))
1011             register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
1012         else
1013             register_size = 0;
1014     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1015     {
1016         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1017         source_addr += offset;
1018         size = control_msg_tmp->length;
1019         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1020         register_size = 0;  
1021     }
1022
1023     if(status == GNI_RC_SUCCESS)
1024     {
1025         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
1026         if(status == GNI_RC_SUCCESS)
1027         {
1028             buffered_send_msg += register_size;
1029             if(control_msg_tmp->seq_id == 0)
1030             {
1031                 IncreaseMsgInSend(source_addr);
1032             }
1033             FreeControlMsg(control_msg_tmp);
1034             send_status = SEND_GNI_SUCCESS;
1035             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_mempool, LMSG_INIT_TAG); 
1036         }else
1037             send_status = SEND_GNI_NO_RESOURCE;
1038
1039     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1040     {
1041         CmiAbort("Memory registor for large msg\n");
1042     }else 
1043     {
1044         if(!inbuff)
1045             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1046     }
1047     return send_status;
1048 #else
1049     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
1050     if(status == GNI_RC_SUCCESS)
1051     {
1052         status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
1053         if(status == GNI_RC_SUCCESS)
1054         {
1055             FreeControlMsg(control_msg_tmp);
1056         }
1057     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1058     {
1059         CmiAbort("Memory registor for large msg\n");
1060     }else 
1061     {
1062         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1063     }
1064     return status;
1065 #endif
1066 }
1067
1068 inline void LrtsPrepareEnvelope(char *msg, int size)
1069 {
1070     CmiSetMsgSize(msg, size);
1071 }
1072
1073 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1074 {
1075     gni_return_t        status  =   GNI_RC_SUCCESS;
1076     uint8_t tag;
1077     CONTROL_MSG         *control_msg_tmp;
1078     int                 oob = ( mode & OUT_OF_BAND);
1079     SMSG_QUEUE          *queue;
1080
1081 #if USE_OOB
1082     queue = oob? &smsg_oob_queue : &smsg_queue;
1083 #else
1084     queue = &smsg_queue;
1085 #endif
1086
1087     LrtsPrepareEnvelope(msg, size);
1088
1089 #if PRINT_SYH
1090     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1091 #endif 
1092 #if CMK_SMP && COMM_THREAD_SEND
1093     if(size <= SMSG_MAX_MSG)
1094         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1095     else if (size < BIG_MSG) {
1096         control_msg_tmp =  construct_control_msg(size, msg, 0);
1097         buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1098     }
1099     else {
1100           CmiSetMsgSeq(msg, 0);
1101           control_msg_tmp =  construct_control_msg(size, msg, 1);
1102           buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
1103     }
1104 #else //non-smp, smp(worker sending)
1105     if(size <= SMSG_MAX_MSG)
1106     {
1107         status = send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0);  
1108         if(status == GNI_RC_SUCCESS)
1109             CmiFree(msg);
1110     }
1111     else if (size < BIG_MSG) {
1112         control_msg_tmp =  construct_control_msg(size, msg, 0);
1113         send_large_messages(queue, destNode, control_msg_tmp, 0);
1114     }
1115     else {
1116 #if     USE_LRTS_MEMPOOL
1117         CmiSetMsgSeq(msg, 0);
1118         control_msg_tmp =  construct_control_msg(size, msg, 1);
1119         send_large_messages(queue, destNode, control_msg_tmp, 0);
1120 #else
1121         control_msg_tmp =  construct_control_msg(size, msg, 0);
1122         send_large_messages(queue, destNode, control_msg_tmp, 0);
1123 #endif
1124     }
1125 #endif
1126     return 0;
1127 }
1128
1129 static void    PumpDatagramConnection();
1130 static void registerUserTraceEvents() {
1131 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1132     traceRegisterUserEvent("setting up connections", 10);
1133     traceRegisterUserEvent("Receiving small msgs", 20);
1134     traceRegisterUserEvent("Release local transaction", 30);
1135     traceRegisterUserEvent("Sending buffered small msgs", 40);
1136     traceRegisterUserEvent("Sending buffered rdma msgs", 50);
1137 #endif
1138 }
1139
1140 void LrtsPostCommonInit(int everReturn)
1141 {
1142 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1143     CpvInitialize(double, projTraceStart);
1144     /* only PE 0 needs to care about registration (to generate sts file). */
1145     if (CmiMyPe() == 0) {
1146         registerMachineUserEventsFunction(&registerUserTraceEvents);
1147     }
1148 #endif
1149
1150 #if CMK_SMP
1151     CmiIdleState *s=CmiNotifyGetState();
1152     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1153     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1154 #else
1155     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1156     if (useDynamicSMSG)
1157     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1158 #endif
1159 }
1160
1161 /* this is called by worker thread */
1162 void LrtsPostNonLocal(){
1163 #if CMK_SMP
1164 #if !COMM_THREAD_SEND
1165     if(mysize == 1) return;
1166     PumpLocalRdmaTransactions();
1167     SendBufferMsg();
1168     SendRdmaMsg();
1169 #endif
1170 #endif
1171 }
1172
1173 /* useDynamicSMSG */
1174 static void    PumpDatagramConnection()
1175 {
1176     uint32_t          remote_address;
1177     uint32_t          remote_id;
1178     gni_return_t status;
1179     gni_post_state_t  post_state;
1180     uint64_t          datagram_id;
1181     int i;
1182
1183    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1184    {
1185        if (datagram_id >= mysize) {           /* bound endpoint */
1186            int pe = datagram_id - mysize;
1187 #if CMK_SMP && !COMM_THREAD_SEND
1188            CmiLock(tx_cq_lock);
1189 #endif
1190            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1191 #if CMK_SMP && !COMM_THREAD_SEND
1192            CmiUnlock(tx_cq_lock);
1193 #endif
1194        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1195        {
1196           CmiAssert(remote_id == pe);
1197           status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1198           GNI_RC_CHECK("Dynamic SMSG Init", status);
1199 #if PRINT_SYH
1200           printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1201 #endif
1202           CmiAssert(smsg_connected_flag[pe] == 1);
1203           smsg_connected_flag[pe] = 2;
1204        }
1205      }
1206      else {         /* unbound ep */
1207        status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1208        if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1209        {
1210           CmiAssert(remote_id<mysize);
1211           CmiAssert(smsg_connected_flag[remote_id] <= 0);
1212           status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1213           GNI_RC_CHECK("Dynamic SMSG Init", status);
1214 #if PRINT_SYH
1215           printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1216 #endif
1217           smsg_connected_flag[remote_id] = 2;
1218
1219           alloc_smsg_attr(&send_smsg_attr);
1220           status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1221           GNI_RC_CHECK("post unbound datagram", status);
1222         }
1223      }
1224    }
1225 }
1226
1227 /* pooling CQ to receive network message */
1228 static void PumpNetworkRdmaMsgs()
1229 {
1230     gni_cq_entry_t      event_data;
1231     gni_return_t        status;
1232
1233 }
1234
1235 inline 
1236 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
1237 {
1238     RDMA_REQUEST        *rdma_request_msg;
1239     MallocRdmaRequest(rdma_request_msg);
1240     rdma_request_msg->destNode = inst_id;
1241     rdma_request_msg->pd = pd;
1242 #if CMK_SMP
1243     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1244 #else
1245     if(sendRdmaBuf == 0)
1246     {
1247         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1248     }else{
1249         sendRdmaTail->next = rdma_request_msg;
1250         sendRdmaTail =  rdma_request_msg;
1251     }
1252 #endif
1253
1254 }
1255 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1256 static void PumpNetworkSmsg()
1257 {
1258     uint64_t            inst_id;
1259     int                 ret;
1260     gni_cq_entry_t      event_data;
1261     gni_return_t        status;
1262     void                *header;
1263     uint8_t             msg_tag;
1264     int                 msg_nbytes;
1265     void                *msg_data;
1266     gni_mem_handle_t    msg_mem_hndl;
1267     gni_smsg_attr_t     *smsg_attr;
1268     gni_smsg_attr_t     *remote_smsg_attr;
1269     int                 init_flag;
1270     CONTROL_MSG         *control_msg_tmp, *header_tmp;
1271     uint64_t            source_addr;
1272     SMSG_QUEUE         *queue = &smsg_queue;
1273
1274 #if CMK_SMP && !COMM_THREAD_SEND
1275     while(1)
1276     {
1277         CmiLock(tx_cq_lock);
1278         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1279         CmiUnlock(tx_cq_lock);
1280         if(status != GNI_RC_SUCCESS)
1281             break;
1282 #else
1283     while ( (status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
1284     {
1285 #endif
1286         inst_id = GNI_CQ_GET_INST_ID(event_data);
1287         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1288 #if PRINT_SYH
1289         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
1290 #endif
1291         if (useDynamicSMSG) {
1292             /* subtle: smsg may come before connection is setup */
1293             while (smsg_connected_flag[inst_id] != 2) 
1294                PumpDatagramConnection();
1295         }
1296         msg_tag = GNI_SMSG_ANY_TAG;
1297 #if CMK_SMP && !COMM_THREAD_SEND
1298         while(1) {
1299             CmiLock(tx_cq_lock);
1300             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1301             CmiUnlock(tx_cq_lock);
1302             if (status != GNI_RC_SUCCESS)
1303                 break;
1304 #else
1305         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
1306 #endif
1307             /* copy msg out and then put into queue (small message) */
1308             switch (msg_tag) {
1309             case SMALL_DATA_TAG:
1310             {
1311                 START_EVENT();
1312                 msg_nbytes = CmiGetMsgSize(header);
1313                 msg_data    = CmiAlloc(msg_nbytes);
1314                 memcpy(msg_data, (char*)header, msg_nbytes);
1315 #if CMK_SMP_TRACE_COMMTHREAD
1316                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1317 #endif
1318                 handleOneRecvedMsg(msg_nbytes, msg_data);
1319                 break;
1320             }
1321             case LMSG_INIT_TAG:
1322             {
1323 #if PRINT_SYH
1324                 printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
1325 #endif
1326                 getLargeMsgRequest(header, inst_id);
1327                 break;
1328             }
1329             case ACK_TAG:   //msg fit into mempool
1330             {
1331                 /* Get is done, release message . Now put is not used yet*/
1332 #if ! USE_LRTS_MEMPOOL
1333                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((CONTROL_MSG *)header)->source_mem_hndl), &omdh);
1334 #else
1335                 DecreaseMsgInSend( ((void*)((CONTROL_MSG *) header)->source_addr));
1336 #endif
1337                 if(NoMsgInSend(((void*)((CONTROL_MSG *) header)->source_addr)))
1338                     buffered_send_msg -= GetMempoolsize((void*)((CONTROL_MSG *) header)->source_addr);
1339                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1340                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
1341                 break;
1342             }
1343             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
1344             {
1345                 header_tmp = (CONTROL_MSG *) header;
1346                 void *msg = (void*)(header_tmp->source_addr);
1347                 int cur_seq = CmiGetMsgSeq(header_tmp->source_addr);
1348                 int offset = ONE_SEG*(cur_seq+1);
1349                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh);
1350                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1351                 if (remain_size < 0) remain_size = 0;
1352                 CmiSetMsgSize(msg, remain_size);
1353                 if(remain_size <= 0) //transaction done
1354                 {
1355                     CmiFree(msg);
1356                 }else if (header_tmp->total_length > offset)
1357                 {
1358                     CmiSetMsgSeq(header_tmp->source_addr, cur_seq+1);
1359                     MallocControlMsg(control_msg_tmp);
1360                     control_msg_tmp->source_addr = header_tmp->source_addr;
1361                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
1362                     control_msg_tmp->total_length   = header_tmp->total_length; 
1363                     control_msg_tmp->length         = header_tmp->total_length - offset;
1364                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
1365                     control_msg_tmp->seq_id         = cur_seq+1+1;
1366                     //send next seg
1367                     send_large_messages(queue, inst_id, control_msg_tmp, 0);
1368                          // pipelining
1369                     if (header_tmp->seq_id == 1) {
1370                       int i;
1371                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
1372                         int seq = cur_seq+i+2;
1373                         CmiSetMsgSeq(header_tmp->source_addr, seq-1);
1374                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)header_tmp->source_addr, seq);
1375                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
1376                         send_large_messages(queue, inst_id, control_msg_tmp, 0);
1377                         if (header_tmp->total_length <= ONE_SEG*seq) break;
1378                       }
1379                     }
1380                 }
1381                 break;
1382             }
1383
1384 #if CMK_PERSISTENT_COMM
1385             case PUT_DONE_TAG: //persistent message
1386             void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1387             int size = ((CONTROL_MSG *) header)->length;
1388             CmiReference(msg);
1389             handleOneRecvedMsg(size, msg); 
1390             break;
1391 #endif
1392 #ifdef CMK_DIRECT
1393             case DIRECT_PUT_DONE_TAG:  //cmi direct 
1394             (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1395            break;
1396 #endif
1397             default: {
1398                 printf("weird tag problem\n");
1399                 CmiAbort("Unknown tag\n");
1400                      }
1401             }
1402 #if CMK_SMP && !COMM_THREAD_SEND
1403             CmiLock(tx_cq_lock);
1404 #endif
1405             GNI_SmsgRelease(ep_hndl_array[inst_id]);
1406 #if CMK_SMP && !COMM_THREAD_SEND
1407             CmiUnlock(tx_cq_lock);
1408 #endif
1409             msg_tag = GNI_SMSG_ANY_TAG;
1410         } //endwhile getNext
1411     }   //end while GetEvent
1412     if(status == GNI_RC_ERROR_RESOURCE)
1413     {
1414         GNI_RC_CHECK("Smsg_rx_cq full", status);
1415     }
1416 }
1417
1418 static void printDesc(gni_post_descriptor_t *pd)
1419 {
1420     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
1421 }
1422
1423 // for BIG_MSG called on receiver side for receiving control message
1424 // LMSG_INIT_TAG
1425 static void getLargeMsgRequest(void* header, uint64_t inst_id )
1426 {
1427 #if     USE_LRTS_MEMPOOL
1428     CONTROL_MSG         *request_msg;
1429     gni_return_t        status = GNI_RC_SUCCESS;
1430     void                *msg_data;
1431     gni_post_descriptor_t *pd;
1432     gni_mem_handle_t    msg_mem_hndl;
1433     int source, size, transaction_size, offset = 0;
1434     int     register_size = 0;
1435     // initial a get to transfer data from the sender side */
1436     request_msg = (CONTROL_MSG *) header;
1437     size = request_msg->total_length;
1438     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1439     if(request_msg->seq_id < 2)   {
1440         msg_data = CmiAlloc(size);
1441         CmiSetMsgSeq(msg_data, 0);
1442         _MEMCHECK(msg_data);
1443     }
1444     else {
1445         offset = ONE_SEG*(request_msg->seq_id-1);
1446         msg_data = (char*)request_msg->dest_addr + offset;
1447     }
1448    
1449     MallocPostDesc(pd);
1450     pd->cqwrite_value = request_msg->seq_id;
1451     if( request_msg->seq_id == 0)
1452     {
1453         pd->local_mem_hndl= GetMemHndl(msg_data);
1454         transaction_size = ALIGN64(size);
1455         if(IsMemHndlZero(pd->local_mem_hndl))
1456         {   
1457             status = registerMempool((void*)(msg_data));
1458             if(status == GNI_RC_SUCCESS)
1459             {
1460                 pd->local_mem_hndl = GetMemHndl(msg_data);
1461             }
1462             else
1463             {
1464                 SetMemHndlZero(pd->local_mem_hndl);
1465             }
1466         }
1467         if(NoMsgInRecv( (void*)(msg_data)))
1468             register_size = GetMempoolsize((void*)(msg_data));
1469         else
1470             register_size = 0;
1471     }
1472     else{
1473         transaction_size = ALIGN64(request_msg->length);
1474         status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh);
1475         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1476         {
1477             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1478         }
1479     }
1480     pd->first_operand = ALIGN64(size);                   //  total length
1481
1482     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
1483         pd->type            = GNI_POST_FMA_GET;
1484     else
1485         pd->type            = GNI_POST_RDMA_GET;
1486 #if REMOTE_EVENT
1487     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1488 #else
1489     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1490 #endif
1491     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1492     pd->length          = transaction_size;
1493     pd->local_addr      = (uint64_t) msg_data;
1494     pd->remote_addr     = request_msg->source_addr + offset;
1495     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1496     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1497     pd->rdma_mode       = 0;
1498     pd->amo_cmd         = 0;
1499
1500     //memory registration success
1501     if(status == GNI_RC_SUCCESS)
1502     {
1503 #if CMK_SMP && !COMM_THREAD_SEND
1504         CmiLock(tx_cq_lock);
1505 #endif
1506         if(pd->type == GNI_POST_RDMA_GET) 
1507             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1508         else
1509             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1510 #if CMK_SMP && !COMM_THREAD_SEND
1511         CmiUnlock(tx_cq_lock);
1512 #endif
1513          
1514         if(status == GNI_RC_SUCCESS )
1515         {
1516             if(pd->cqwrite_value == 0)
1517             {
1518 #if MACHINE_DEBUG_LOG
1519                 buffered_recv_msg += register_size;
1520                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
1521 #endif
1522                 IncreaseMsgInRecv(msg_data);
1523
1524             }
1525         }
1526     }else
1527     {
1528         SetMemHndlZero(pd->local_mem_hndl);
1529     }
1530     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1531     {
1532         bufferRdmaMsg(inst_id, pd); 
1533     }else {
1534         /* printf("source: %d pd:%p\n", source, pd); */
1535         GNI_RC_CHECK("AFter posting", status);
1536     }
1537 #else
1538     CONTROL_MSG         *request_msg;
1539     gni_return_t        status;
1540     void                *msg_data;
1541     gni_post_descriptor_t *pd;
1542     RDMA_REQUEST        *rdma_request_msg;
1543     gni_mem_handle_t    msg_mem_hndl;
1544     //int source;
1545     // initial a get to transfer data from the sender side */
1546     request_msg = (CONTROL_MSG *) header;
1547     msg_data = CmiAlloc(request_msg->length);
1548     _MEMCHECK(msg_data);
1549
1550     status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
1551
1552     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
1553     {
1554         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
1555     }
1556
1557     MallocPostDesc(pd);
1558     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
1559         pd->type            = GNI_POST_FMA_GET;
1560     else
1561         pd->type            = GNI_POST_RDMA_GET;
1562 #if REMOTE_EVENT
1563     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
1564 #else
1565     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
1566 #endif
1567     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1568     pd->length          = ALIGN64(request_msg->length);
1569     pd->local_addr      = (uint64_t) msg_data;
1570     pd->remote_addr     = request_msg->source_addr;
1571     pd->remote_mem_hndl = request_msg->source_mem_hndl;
1572     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
1573     pd->rdma_mode       = 0;
1574     pd->amo_cmd         = 0;
1575
1576     //memory registration successful
1577     if(status == GNI_RC_SUCCESS)
1578     {
1579         pd->local_mem_hndl  = msg_mem_hndl;
1580 #if CMK_SMP && !COMM_THREAD_SEND
1581             CmiLock(tx_cq_lock);
1582 #endif
1583         if(pd->type == GNI_POST_RDMA_GET) 
1584             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
1585         else
1586             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
1587 #if CMK_SMP && !COMM_THREAD_SEND
1588             CmiUnlock(tx_cq_lock);
1589 #endif
1590     }else
1591     {
1592         SetMemHndlZero(pd->local_mem_hndl);
1593     }
1594     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
1595     {
1596         MallocRdmaRequest(rdma_request_msg);
1597         rdma_request_msg->next = 0;
1598         rdma_request_msg->destNode = inst_id;
1599         rdma_request_msg->pd = pd;
1600         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1601     }else {
1602         GNI_RC_CHECK("AFter posting", status);
1603     }
1604 #endif
1605 }
1606
1607 static void PumpLocalRdmaTransactions()
1608 {
1609     gni_cq_entry_t          ev;
1610     gni_return_t            status;
1611     uint64_t                type, inst_id;
1612     gni_post_descriptor_t   *tmp_pd;
1613     MSG_LIST                *ptr;
1614     CONTROL_MSG             *ack_msg_tmp;
1615     uint8_t             msg_tag;
1616 #ifdef CMK_DIRECT
1617     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
1618 #endif
1619     SMSG_QUEUE         *queue = &smsg_queue;
1620
1621 #if CMK_SMP && !COMM_THREAD_SEND
1622     while(1) {
1623         CmiLock(tx_cq_lock);
1624         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
1625         CmiUnlock(tx_cq_lock);
1626         if(status != GNI_RC_SUCCESS)
1627             break;
1628 #else
1629     while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
1630     {
1631 #endif
1632         type = GNI_CQ_GET_TYPE(ev);
1633         if (type == GNI_CQ_EVENT_TYPE_POST)
1634         {
1635             inst_id     = GNI_CQ_GET_INST_ID(ev);
1636 #if PRINT_SYH
1637             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
1638 #endif
1639 #if CMK_SMP && !COMM_THREAD_SEND
1640             CmiLock(tx_cq_lock);
1641 #endif
1642             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
1643 #if CMK_SMP && !COMM_THREAD_SEND
1644             CmiUnlock(tx_cq_lock);
1645 #endif
1646 #ifdef CMK_DIRECT
1647             if(tmp_pd->amo_cmd == 1)
1648             {
1649                 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
1650                 cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
1651                 cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
1652                 cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
1653             }
1654             else{
1655                 MallocControlMsg(ack_msg_tmp);
1656                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1657                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1658             }
1659 #else
1660             MallocControlMsg(ack_msg_tmp);
1661             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
1662             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
1663 #endif
1664             ////Message is sent, free message , put is not used now
1665             switch (tmp_pd->type) {
1666 #if CMK_PERSISTENT_COMM
1667             case GNI_POST_RDMA_PUT:
1668 #if ! USE_LRTS_MEMPOOL
1669                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1670 #endif
1671             case GNI_POST_FMA_PUT:
1672                 CmiFree((void *)tmp_pd->local_addr);
1673                 msg_tag = PUT_DONE_TAG;
1674                 break;
1675 #endif
1676 #ifdef CMK_DIRECT
1677             case GNI_POST_RDMA_PUT:
1678             case GNI_POST_FMA_PUT:
1679                 //sender ACK to receiver to trigger it is done
1680                 msg_tag = DIRECT_PUT_DONE_TAG;
1681                 break;
1682 #endif
1683             case GNI_POST_RDMA_GET:
1684             case GNI_POST_FMA_GET:
1685 #if  ! USE_LRTS_MEMPOOL
1686                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1687                 msg_tag = ACK_TAG;  
1688 #else
1689                 ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
1690                 if(ack_msg_tmp->seq_id > 0)      // BIG_MSG
1691                 {
1692                     msg_tag = BIG_MSG_TAG; 
1693                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
1694                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
1695                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
1696                 } 
1697                 else
1698                 {
1699                     msg_tag = ACK_TAG;  
1700                     ack_msg_tmp->dest_addr = tmp_pd->local_addr;
1701                 }
1702                 ack_msg_tmp->length = tmp_pd->length;
1703                 ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
1704 #endif
1705                 break;
1706             default:
1707                 CmiPrintf("type=%d\n", tmp_pd->type);
1708                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
1709             }
1710 #if CMK_DIRECT
1711             if(tmp_pd->amo_cmd == 1)
1712                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
1713             else
1714 #endif
1715                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
1716             if(status == GNI_RC_SUCCESS)
1717             {
1718 #if CMK_DIRECT
1719                 if(tmp_pd->amo_cmd == 1)
1720                     free(cmk_direct_done_msg); 
1721                 else
1722 #endif
1723                     FreeControlMsg(ack_msg_tmp);
1724
1725             }
1726 #if CMK_PERSISTENT_COMM
1727             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
1728 #endif
1729             {
1730                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
1731 #if PRINT_SYH
1732                     printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
1733 #endif
1734 #if CMK_SMP_TRACE_COMMTHREAD
1735                     START_EVENT();
1736 #endif
1737                     CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
1738                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
1739 #if MACHINE_DEBUG_LOG
1740                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
1741                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
1742                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
1743 #endif
1744 #if CMK_SMP_TRACE_COMMTHREAD
1745                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
1746 #endif
1747                     handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
1748                 }else if(msg_tag == BIG_MSG_TAG){
1749                   void *msg = (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
1750                   CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
1751                   if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
1752 #if CMK_SMP_TRACE_COMMTHREAD
1753                     START_EVENT();
1754 #endif
1755 #if PRINT_SYH
1756                     printf("Pipeline msg done [%d]\n", myrank);
1757 #endif
1758 #if CMK_SMP_TRACE_COMMTHREAD
1759                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
1760 #endif
1761                     handleOneRecvedMsg(tmp_pd->first_operand, msg); 
1762                   }
1763                 }
1764             }
1765             FreePostDesc(tmp_pd);
1766         }
1767     } //end while
1768     if(status == GNI_RC_ERROR_RESOURCE)
1769     {
1770         GNI_RC_CHECK("Smsg_tx_cq full", status);
1771     }
1772 }
1773
1774 static void  SendRdmaMsg()
1775 {
1776     gni_return_t            status = GNI_RC_SUCCESS;
1777     gni_mem_handle_t        msg_mem_hndl;
1778     RDMA_REQUEST *ptr = 0, *tmp_ptr;
1779     RDMA_REQUEST *pre = 0;
1780     int i, register_size = 0;
1781
1782 #if CMK_SMP
1783     int len = PCQueueLength(sendRdmaBuf);
1784     for (i=0; i<len; i++)
1785     {
1786         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
1787         if (ptr == NULL) break;
1788 #else
1789     ptr = sendRdmaBuf;
1790     while (ptr!=0)
1791     {
1792 #endif 
1793         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool); 
1794         gni_post_descriptor_t *pd = ptr->pd;
1795         status = GNI_RC_SUCCESS;
1796         
1797         if(pd->cqwrite_value == 0)
1798         {
1799             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
1800             {
1801                 status = registerMempool((void*)(pd->local_addr));
1802                 if(status == GNI_RC_SUCCESS)
1803                 {
1804                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1805                 }
1806             }else
1807             {
1808                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
1809             }
1810             if(NoMsgInRecv( (void*)(pd->local_addr)))
1811                 register_size = GetMempoolsize((void*)(pd->local_addr));
1812             else
1813                 register_size = 0;
1814         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
1815         {
1816             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
1817         }
1818         if(status == GNI_RC_SUCCESS)        //mem register good
1819         {
1820 #if CMK_SMP && !COMM_THREAD_SEND
1821             CmiLock(tx_cq_lock);
1822 #endif
1823             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
1824                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
1825             else
1826                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
1827 #if CMK_SMP && !COMM_THREAD_SEND
1828             CmiUnlock(tx_cq_lock);
1829 #endif
1830
1831             if(status == GNI_RC_SUCCESS)    //post good
1832             {
1833 #if !CMK_SMP
1834                 tmp_ptr = ptr;
1835                 if(pre != 0) {
1836                     pre->next = ptr->next;
1837                 }
1838                 else {
1839                     sendRdmaBuf = ptr->next;
1840                 }
1841                 ptr = ptr->next;
1842                 FreeRdmaRequest(tmp_ptr);
1843 #endif
1844                 if(pd->cqwrite_value == 0)
1845                 {
1846                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
1847                 }
1848 #if MACHINE_DEBUG_LOG
1849                 buffered_recv_msg += register_size;
1850                 MACHSTATE(8, "GO request from buffered\n"); 
1851 #endif
1852             }else           // cannot post
1853             {
1854 #if CMK_SMP
1855                 PCQueuePush(sendRdmaBuf, (char*)ptr);
1856 #else
1857                 pre = ptr;
1858                 ptr = ptr->next;
1859 #endif
1860                 break;
1861             }
1862         } else          //memory registration fails
1863         {
1864 #if CMK_SMP
1865             PCQueuePush(sendRdmaBuf, (char*)ptr);
1866 #else
1867             pre = ptr;
1868             ptr = ptr->next;
1869 #endif
1870         }
1871     } //end while
1872 #if ! CMK_SMP
1873     if(ptr == 0)
1874         sendRdmaTail = pre;
1875 #endif
1876 }
1877
1878 // return 1 if all messages are sent
1879 static int SendBufferMsg(SMSG_QUEUE *queue)
1880 {
1881     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
1882     CONTROL_MSG         *control_msg_tmp;
1883     send_gni_return_t  send_status = SEND_GNI_NO_RESOURCE;
1884     gni_return_t        status;
1885     int                 done = 1;
1886     int                 register_size;
1887     void                *register_addr;
1888     int                 index_previous = -1;
1889     int                 index = queue->smsg_head_index;
1890 #if CMI_EXERT_SEND_CAP
1891     int                 sent_cnt = 0;
1892 #endif
1893
1894 #if ! CMK_SMP
1895     index = queue->smsg_head_index;
1896 #else
1897     index = 0;
1898 #endif
1899 #if CMK_SMP
1900     while(index <mysize)
1901     {
1902         int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
1903         for (i=0; i<len; i++) 
1904         {
1905             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
1906             if (ptr == NULL) break;
1907 #else
1908     while(index != -1)
1909     {
1910         ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
1911         pre = 0;
1912         while(ptr != 0)
1913         {
1914 #endif
1915             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
1916             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
1917                 /* connection not exists yet */
1918               done = 0;
1919               break;
1920             }
1921             status = GNI_RC_ERROR_RESOURCE;
1922             send_status = SEND_GNI_NO_RESOURCE;
1923             switch(ptr->tag)
1924             {
1925             case SMALL_DATA_TAG:
1926                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
1927                 if(status == GNI_RC_SUCCESS)
1928                 {
1929                     send_status = SEND_GNI_SUCCESS;
1930                     CmiFree(ptr->msg);
1931                 }
1932                 break;
1933             case LMSG_INIT_TAG:
1934                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
1935                 send_status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
1936                 if(send_status != SEND_GNI_SUCCESS)
1937                     done = 0;
1938                 break;
1939             case   ACK_TAG:
1940             case   BIG_MSG_TAG:
1941                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
1942                 if(status == GNI_RC_SUCCESS)
1943                 {
1944                     send_status = SEND_GNI_SUCCESS;
1945                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
1946                 }else
1947                     done = 0;
1948                 break;
1949 #ifdef CMK_DIRECT
1950             case DIRECT_PUT_DONE_TAG:
1951                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
1952                 if(status == GNI_RC_SUCCESS)
1953                 {
1954                     send_status = SEND_GNI_SUCCESS;
1955                     free((CMK_DIRECT_HEADER*)ptr->msg);
1956                 }
1957                 break;
1958
1959 #endif
1960             default:
1961                 printf("Weird tag\n");
1962                 CmiAbort("should not happen\n");
1963             }
1964             if(send_status == SEND_GNI_SUCCESS)
1965             {
1966 #if !CMK_SMP
1967                 tmp_ptr = ptr;
1968                 if(pre)
1969                 {
1970                     ptr = pre ->next = ptr->next;
1971                 }else
1972                 {
1973                     ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
1974                 }
1975                 FreeMsgList(tmp_ptr);
1976 #else
1977                 FreeMsgList(ptr);
1978 #endif
1979 #if PRINT_SYH
1980                 buffered_smsg_counter--;
1981                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
1982 #endif
1983 #if CMI_EXERT_SEND_CAP
1984                 sent_cnt++;
1985                 if(sent_cnt == SEND_CAP)
1986                     break;
1987 #endif
1988             }else {
1989 #if CMK_SMP
1990                 PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
1991 #else
1992                 pre = ptr;
1993                 ptr=ptr->next;
1994 #endif
1995                 done = 0;
1996                 if(send_status == SEND_GNI_NO_RESOURCE)
1997                     break;
1998             } 
1999         } //end while
2000 #if !CMK_SMP
2001         if(ptr == 0)
2002             queue->smsg_msglist_index[index].tail = pre;
2003         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2004         {
2005             if(index_previous != -1)
2006                 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2007             else
2008                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2009         }else
2010         {
2011             index_previous = index;
2012         }
2013         index = queue->smsg_msglist_index[index].next;
2014 #else
2015         index++;
2016 #endif
2017
2018 #if CMI_EXERT_SEND_CAP
2019         if(sent_cnt == SEND_CAP)
2020                 break;
2021 #endif
2022     }   // end pooling for all cores
2023     return done;
2024 }
2025
2026 void LrtsAdvanceCommunication(int whileidle)
2027 {
2028     /*  Receive Msg first */
2029     MACHSTATE(8, "calling advance comm \n") ; 
2030 #if CMK_SMP_TRACE_COMMTHREAD
2031     double startT, endT;
2032 #endif
2033     if (useDynamicSMSG && whileidle)
2034     {
2035 #if CMK_SMP_TRACE_COMMTHREAD
2036         startT = CmiWallTimer();
2037 #endif
2038         PumpDatagramConnection();
2039 #if CMK_SMP_TRACE_COMMTHREAD
2040         endT = CmiWallTimer();
2041         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(10, startT, endT);
2042 #endif
2043     }
2044
2045 #if CMK_SMP_TRACE_COMMTHREAD
2046     startT = CmiWallTimer();
2047 #endif
2048     PumpNetworkSmsg();
2049     MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
2050 #if CMK_SMP_TRACE_COMMTHREAD
2051     endT = CmiWallTimer();
2052     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
2053 #endif
2054
2055 #if CMK_SMP_TRACE_COMMTHREAD
2056     startT = CmiWallTimer();
2057 #endif
2058     PumpLocalRdmaTransactions();
2059     MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
2060 #if CMK_SMP_TRACE_COMMTHREAD
2061     endT = CmiWallTimer();
2062     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
2063 #endif
2064     /* Send buffered Message */
2065 #if CMK_SMP_TRACE_COMMTHREAD
2066     startT = CmiWallTimer();
2067 #endif
2068 #if USE_OOB
2069     SendBufferMsg(&smsg_oob_queue);
2070 #endif
2071     SendBufferMsg(&smsg_queue);
2072     MACHSTATE(8, "after SendBufferMsg\n") ; 
2073 #if CMK_SMP_TRACE_COMMTHREAD
2074     endT = CmiWallTimer();
2075     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
2076 #endif
2077
2078 #if CMK_SMP_TRACE_COMMTHREAD
2079     startT = CmiWallTimer();
2080 #endif
2081     SendRdmaMsg();
2082     MACHSTATE(8, "after SendRdmaMsg\n") ; 
2083 #if CMK_SMP_TRACE_COMMTHREAD
2084     endT = CmiWallTimer();
2085     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
2086 #endif
2087 }
2088
2089 /* useDynamicSMSG */
2090 static void _init_dynamic_smsg()
2091 {
2092     gni_return_t status;
2093     uint32_t     vmdh_index = -1;
2094     int i;
2095
2096     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2097     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2098     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2099     for(i=0; i<mysize; i++) {
2100         smsg_connected_flag[i] = 0;
2101         smsg_attr_vector_local[i] = NULL;
2102         smsg_attr_vector_remote[i] = NULL;
2103     }
2104     if(mysize <=512)
2105     {
2106         SMSG_MAX_MSG = 4096;
2107     }else if (mysize <= 4096)
2108     {
2109         SMSG_MAX_MSG = 4096/mysize * 1024;
2110     }else if (mysize <= 16384)
2111     {
2112         SMSG_MAX_MSG = 512;
2113     }else {
2114         SMSG_MAX_MSG = 256;
2115     }
2116
2117     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2118     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2119     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2120     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2121     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2122
2123     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2124     mailbox_list->size = smsg_memlen*avg_smsg_connection;
2125     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2126     bzero(mailbox_list->mailbox_base, mailbox_list->size);
2127     mailbox_list->offset = 0;
2128     mailbox_list->next = 0;
2129     
2130     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2131         mailbox_list->size, smsg_rx_cqh,
2132         GNI_MEM_READWRITE,   
2133         vmdh_index,
2134         &(mailbox_list->mem_hndl));
2135     GNI_RC_CHECK("MEMORY registration for smsg", status);
2136
2137     status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
2138     GNI_RC_CHECK("Unbound EP", status);
2139     
2140     alloc_smsg_attr(&send_smsg_attr);
2141
2142     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2143     GNI_RC_CHECK("post unbound datagram", status);
2144
2145       /* always pre-connect to proc 0 */
2146     //if (myrank != 0) connect_to(0);
2147 }
2148
2149 static void _init_static_smsg()
2150 {
2151     gni_smsg_attr_t      *smsg_attr;
2152     gni_smsg_attr_t      remote_smsg_attr;
2153     gni_smsg_attr_t      *smsg_attr_vec;
2154     gni_mem_handle_t     my_smsg_mdh_mailbox;
2155     int      ret, i;
2156     gni_return_t status;
2157     uint32_t              vmdh_index = -1;
2158     mdh_addr_t            base_infor;
2159     mdh_addr_t            *base_addr_vec;
2160     char *env;
2161
2162     if(mysize <=512)
2163     {
2164         SMSG_MAX_MSG = 1024;
2165     }else if (mysize <= 4096)
2166     {
2167         SMSG_MAX_MSG = 1024;
2168     }else if (mysize <= 16384)
2169     {
2170         SMSG_MAX_MSG = 512;
2171     }else {
2172         SMSG_MAX_MSG = 256;
2173     }
2174     
2175     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2176     if (env) SMSG_MAX_MSG = atoi(env);
2177     CmiAssert(SMSG_MAX_MSG > 0);
2178
2179     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2180     
2181     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2182     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2183     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2184     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2185     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2186     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2187     CmiAssert(ret == 0);
2188     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2189     //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG mailbox. \n", smsg_memlen*mysize/1e6);
2190     
2191     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2192             smsg_memlen*(mysize), smsg_rx_cqh,
2193             GNI_MEM_READWRITE,   
2194             vmdh_index,
2195             &my_smsg_mdh_mailbox);
2196
2197     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2198
2199     base_infor.addr =  (uint64_t)smsg_mailbox_base;
2200     base_infor.mdh =  my_smsg_mdh_mailbox;
2201     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2202
2203     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
2204  
2205     for(i=0; i<mysize; i++)
2206     {
2207         if(i==myrank)
2208             continue;
2209         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2210         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2211         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2212         smsg_attr[i].mbox_offset = i*smsg_memlen;
2213         smsg_attr[i].buff_size = smsg_memlen;
2214         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2215         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2216     }
2217
2218     for(i=0; i<mysize; i++)
2219     {
2220         if (myrank == i) continue;
2221
2222         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2223         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2224         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2225         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2226         remote_smsg_attr.buff_size = smsg_memlen;
2227         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2228         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
2229
2230         /* initialize the smsg channel */
2231         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
2232         GNI_RC_CHECK("SMSG Init", status);
2233     } //end initialization
2234
2235     free(base_addr_vec);
2236     free(smsg_attr);
2237
2238     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
2239     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
2240
2241
2242 inline
2243 static void _init_send_queue(SMSG_QUEUE *queue)
2244 {
2245      int i;
2246      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
2247      for(i =0; i<mysize; i++)
2248      {
2249         queue->smsg_msglist_index[i].next = -1;
2250 #if CMK_SMP
2251         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
2252 #else
2253         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
2254 #endif
2255         
2256      }
2257      queue->smsg_head_index = -1;
2258 }
2259
2260 inline
2261 static void _init_smsg()
2262 {
2263     if(mysize > 1) {
2264         if (useDynamicSMSG)
2265             _init_dynamic_smsg();
2266         else
2267             _init_static_smsg();
2268     }
2269
2270     _init_send_queue(&smsg_queue);
2271 #if USE_OOB
2272     _init_send_queue(&smsg_oob_queue);
2273 #endif
2274 }
2275
2276 static void _init_static_msgq()
2277 {
2278     gni_return_t status;
2279     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
2280     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
2281     msgq_attrs.smsg_q_sz = 1;
2282     msgq_attrs.rcv_pool_sz = 1;
2283     msgq_attrs.num_msgq_eps = 2;
2284     msgq_attrs.nloc_insts = 8;
2285     msgq_attrs.modes = 0;
2286     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
2287
2288     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
2289     GNI_RC_CHECK("MSGQ Init", status);
2290
2291
2292 }
2293
2294 #if CMK_SMP && STEAL_MEMPOOL
2295 void *steal_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl)
2296 {
2297     void *pool = NULL;
2298     int i, k;
2299     // check other ranks
2300     for (k=0; k<CmiMyNodeSize()+1; k++) {
2301         i = (CmiMyRank()+k)%CmiMyNodeSize();
2302         if (i==CmiMyRank()) continue;
2303         mempool_type *mptr = CpvAccessOther(mempool, i);
2304         CmiLock(mptr->mempoolLock);
2305         mempool_block *tail =  (mempool_block *)((char*)mptr + mptr->memblock_tail);
2306         if ((char*)tail == (char*)mptr) {     /* this is the only memblock */
2307             CmiUnlock(mptr->mempoolLock);
2308             continue;
2309         }
2310         mempool_header *header = (mempool_header*)((char*)tail + sizeof(mempool_block));
2311         if (header->size >= *size && header->size == tail->size - sizeof(mempool_block)) {
2312             /* search in the free list */
2313           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2314           mempool_header *current = free_header;
2315           while (current) {
2316             if (current->next_free == (char*)header-(char*)mptr) break;
2317             current = current->next_free?(mempool_header*)((char*)mptr + current->next_free):NULL;
2318           }
2319           if (current == NULL) {         /* not found in free list */
2320             CmiUnlock(mptr->mempoolLock);
2321             continue;
2322           }
2323 printf("[%d:%d:%d] steal from %d tail: %p size: %d %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, tail, header->size, tail->size, sizeof(mempool_block));
2324             /* search the previous memblock, and remove the tail */
2325           mempool_block *ptr = (mempool_block *)mptr;
2326           while (ptr) {
2327             if (ptr->memblock_next == mptr->memblock_tail) break;
2328             ptr = ptr->memblock_next?(mempool_block *)((char*)mptr + ptr->memblock_next):NULL;
2329           }
2330           CmiAssert(ptr!=NULL);
2331           ptr->memblock_next = 0;
2332           mptr->memblock_tail = (char*)ptr - (char*)mptr;
2333
2334             /* remove memblock from the free list */
2335           current->next_free = header->next_free;
2336           if (header == free_header) mptr->freelist_head = header->next_free;
2337
2338           CmiUnlock(mptr->mempoolLock);
2339
2340           pool = (void*)tail;
2341           *mem_hndl = tail->mem_hndl;
2342           *size = tail->size;
2343           return pool;
2344         }
2345         CmiUnlock(mptr->mempoolLock);
2346     }
2347
2348       /* steal failed, deregister and free memblock now */
2349     int freed = 0;
2350     for (k=0; k<CmiMyNodeSize()+1; k++) {
2351         i = (CmiMyRank()+k)%CmiMyNodeSize();
2352         mempool_type *mptr = CpvAccessOther(mempool, i);
2353         if (i!=CmiMyRank()) CmiLock(mptr->mempoolLock);
2354
2355         mempool_block *mempools_head = &(mptr->mempools_head);
2356         mempool_block *current = mempools_head;
2357         mempool_block *prev = NULL;
2358
2359         while (current) {
2360           int isfree = 0;
2361           mempool_header *free_header = mptr->freelist_head?(mempool_header*)((char*)mptr+mptr->freelist_head):NULL;
2362 printf("[%d:%d:%d] checking rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, current, current->size, *size);
2363           mempool_header *cur = free_header;
2364           mempool_header *header;
2365           if (current != mempools_head) {
2366             header = (mempool_header*)((char*)current + sizeof(mempool_block));
2367              /* search in free list */
2368             if (header->size == current->size - sizeof(mempool_block)) {
2369               cur = free_header;
2370               while (cur) {
2371                 if (cur->next_free == (char*)header-(char*)mptr) break;
2372                 cur = cur->next_free?(mempool_header*)((char*)mptr + cur->next_free):NULL;
2373               }
2374               if (cur != NULL) isfree = 1;
2375             }
2376           }
2377           if (isfree) {
2378               /* remove from free list */
2379             cur->next_free = header->next_free;
2380             if (header == free_header) mptr->freelist_head = header->next_free;
2381              // deregister
2382             gni_return_t status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &current->mem_hndl, &omdh);
2383             GNI_RC_CHECK("steal Mempool de-register", status);
2384             mempool_block *ptr = current;
2385             current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2386             prev->memblock_next = current?(char*)current - (char*)mptr:0;
2387 printf("[%d:%d:%d] free rank: %d ptr: %p size: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, ptr, ptr->size, *size);
2388             freed += ptr->size;
2389             free(ptr);
2390              // try now
2391             if (freed > *size) {
2392               if (pool == NULL) {
2393                 int ret = posix_memalign(&pool, ALIGNBUF, *size);
2394                 CmiAssert(ret == 0);
2395               }
2396               status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
2397               if (status == GNI_RC_SUCCESS) {
2398                 if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2399 printf("[%d:%d:%d] GOT IT rank: %d wanted: %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size);
2400                 return pool;
2401               }
2402 printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), i, *size, status);
2403             }
2404           }
2405           else {
2406              prev = current;
2407              current = current->memblock_next?(mempool_block *)((char*)mptr+current->memblock_next):NULL;
2408           }
2409         }
2410
2411         if (i!=CmiMyRank()) CmiUnlock(mptr->mempoolLock);
2412     }
2413       /* still no luck registering pool */
2414     if (pool) free(pool);
2415     return NULL;
2416 }
2417 #endif
2418 static long long int total_mempool_size = 0;
2419 static long long int total_mempool_calls = 0;
2420 #if USE_LRTS_MEMPOOL
2421 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
2422 {
2423     void *pool;
2424     int ret;
2425
2426     int default_size =  expand_flag? _expand_mem : _mempool_size;
2427     if (*size < default_size) *size = default_size;
2428     total_mempool_size += *size;
2429     total_mempool_calls += 1;
2430     ret = posix_memalign(&pool, ALIGNBUF, *size);
2431     if (ret != 0) {
2432 #if CMK_SMP && STEAL_MEMPOOL
2433       pool = steal_mempool_block(size, mem_hndl);
2434       if (pool != NULL) return pool;
2435 #endif
2436       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
2437       if (ret == ENOMEM)
2438         CmiAbort("alloc_mempool_block: out of memory.");
2439       else
2440         CmiAbort("alloc_mempool_block: posix_memalign failed");
2441     }
2442         SetMemHndlZero((*mem_hndl));
2443     
2444     return pool;
2445 }
2446
2447 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
2448 {
2449     if(!(IsMemHndlZero(mem_hndl)))
2450     {
2451         gni_return_t status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2452         GNI_RC_CHECK("free_mempool_block Mempool de-register", status);
2453     }
2454     free(ptr);
2455 }
2456 #endif
2457
2458 void LrtsPreCommonInit(int everReturn){
2459 #if USE_LRTS_MEMPOOL
2460     CpvInitialize(mempool_type*, mempool);
2461     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block);
2462     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
2463 #endif
2464 }
2465
2466
2467 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2468 {
2469     register int            i;
2470     int                     rc;
2471     int                     device_id = 0;
2472     unsigned int            remote_addr;
2473     gni_cdm_handle_t        cdm_hndl;
2474     gni_return_t            status = GNI_RC_SUCCESS;
2475     uint32_t                vmdh_index = -1;
2476     uint8_t                 ptag;
2477     unsigned int            local_addr, *MPID_UGNI_AllAddr;
2478     int                     first_spawned;
2479     int                     physicalID;
2480     char                   *env;
2481
2482     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
2483     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
2484     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
2485    
2486     status = PMI_Init(&first_spawned);
2487     GNI_RC_CHECK("PMI_Init", status);
2488
2489     status = PMI_Get_size(&mysize);
2490     GNI_RC_CHECK("PMI_Getsize", status);
2491
2492     status = PMI_Get_rank(&myrank);
2493     GNI_RC_CHECK("PMI_getrank", status);
2494
2495     //physicalID = CmiPhysicalNodeID(myrank);
2496     
2497     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
2498
2499     *myNodeID = myrank;
2500     *numNodes = mysize;
2501   
2502 #if !COMM_THREAD_SEND
2503     /* Currently, we only consider the case that comm. thread will only recv msgs */
2504     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
2505 #endif
2506
2507     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
2508     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
2509     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
2510
2511     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
2512     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
2513     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
2514
2515     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
2516     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
2517     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
2518     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
2519     
2520     if(myrank == 0)
2521     {
2522         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
2523         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
2524     }
2525 #ifdef USE_ONESIDED
2526     onesided_init(NULL, &onesided_hnd);
2527
2528     // this is a GNI test, so use the libonesided bypass functionality
2529     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
2530     local_addr = gniGetNicAddress();
2531 #else
2532     ptag = get_ptag();
2533     cookie = get_cookie();
2534 #if 0
2535     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
2536 #endif
2537     //Create and attach to the communication  domain */
2538     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
2539     GNI_RC_CHECK("GNI_CdmCreate", status);
2540     //* device id The device id is the minor number for the device
2541     //that is assigned to the device by the system when the device is created.
2542     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
2543     //where X is the device number 0 default 
2544     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
2545     GNI_RC_CHECK("GNI_CdmAttach", status);
2546     local_addr = get_gni_nic_address(0);
2547 #endif
2548     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
2549     _MEMCHECK(MPID_UGNI_AllAddr);
2550     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
2551     /* create the local completion queue */
2552     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
2553     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
2554     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
2555     
2556     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
2557     GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
2558     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
2559
2560     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
2561     GNI_RC_CHECK("Create CQ (rx)", status);
2562     
2563     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
2564     //GNI_RC_CHECK("Create Post CQ (rx)", status);
2565     
2566     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
2567     //GNI_RC_CHECK("Create BTE CQ", status);
2568
2569     /* create the endpoints. they need to be bound to allow later CQWrites to them */
2570     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
2571     _MEMCHECK(ep_hndl_array);
2572 #if CMK_SMP && !COMM_THREAD_SEND
2573     tx_cq_lock  = CmiCreateLock();
2574     rx_cq_lock  = CmiCreateLock();
2575 #endif
2576     for (i=0; i<mysize; i++) {
2577         if(i == myrank) continue;
2578         status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
2579         GNI_RC_CHECK("GNI_EpCreate ", status);   
2580         remote_addr = MPID_UGNI_AllAddr[i];
2581         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
2582         GNI_RC_CHECK("GNI_EpBind ", status);   
2583     }
2584
2585     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
2586     _init_smsg();
2587     PMI_Barrier();
2588
2589 #if     USE_LRTS_MEMPOOL
2590     env = getenv("CHARM_UGNI_MEMPOOL_SIZE");
2591     if (env) _mempool_size = CmiReadSize(env);
2592     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&env,"Set the memory pool size")) 
2593         _mempool_size = CmiReadSize(env);
2594
2595     if (myrank==0) printf("Charm++> memory pool size: %1.fMB\n", _mempool_size/1024.0/1024);
2596
2597     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
2598     if (env) {
2599         MAX_REG_MEM = CmiReadSize(env);
2600         if (myrank==0) printf("Charm++> memory pool maximum size: %1.fMB\n", MAX_REG_MEM/1024.0/1024);
2601     }
2602
2603     env = getenv("CHARM_UGNI_SEND_MAX");
2604     if (env) {
2605         MAX_BUFF_SEND = CmiReadSize(env);
2606         if (myrank==0) printf("Charm++> maximum pending memory pool for sending: %1.fMB\n", MAX_BUFF_SEND/1024.0/1024);
2607     }
2608
2609     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
2610     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
2611 #endif
2612
2613     /* init DMA buffer for medium message */
2614
2615     //_init_DMA_buffer();
2616     
2617     free(MPID_UGNI_AllAddr);
2618 #if CMK_SMP
2619     sendRdmaBuf = PCQueueCreate();
2620 #else
2621     sendRdmaBuf = 0;
2622 #endif
2623 #if MACHINE_DEBUG_LOG
2624     char ln[200];
2625     sprintf(ln,"debugLog.%d",myrank);
2626     debugLog=fopen(ln,"w");
2627 #endif
2628
2629 }
2630
2631 void* LrtsAlloc(int n_bytes, int header)
2632 {
2633     void *ptr;
2634 #if 0
2635     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
2636 #endif
2637     if(n_bytes <= SMSG_MAX_MSG)
2638     {
2639         int totalsize = n_bytes+header;
2640         ptr = malloc(totalsize);
2641     }
2642     else {
2643         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
2644 #if     USE_LRTS_MEMPOOL
2645         n_bytes = ALIGN64(n_bytes);
2646         if(n_bytes <= BIG_MSG)
2647         {
2648             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
2649             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
2650         }else 
2651         {
2652             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2653             ptr = res + ALIGNBUF - header;
2654
2655         }
2656 #else
2657         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
2658         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
2659         ptr = res + ALIGNBUF - header;
2660 #endif
2661     }
2662     return ptr;
2663 }
2664
2665 void  LrtsFree(void *msg)
2666 {
2667     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
2668     if (size <= SMSG_MAX_MSG)
2669       free(msg);
2670     else if(size>BIG_MSG)
2671     {
2672         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2673
2674     }else
2675     {
2676 #if     USE_LRTS_MEMPOOL
2677 #if CMK_SMP
2678         mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2679 #else
2680         mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
2681 #endif
2682 #else
2683         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
2684 #endif
2685     }
2686 }
2687
2688 void LrtsExit()
2689 {
2690     /* free memory ? */
2691 #if USE_LRTS_MEMPOOL
2692     mempool_destroy(CpvAccess(mempool));
2693 #endif
2694     PMI_Finalize();
2695     exit(0);
2696 }
2697
2698 void LrtsDrainResources()
2699 {
2700     if(mysize == 1) return;
2701     while (
2702 #if USE_OOB
2703            !SendBufferMsg(&smsg_oob_queue) ||
2704 #endif
2705            !SendBufferMsg(&smsg_queue))
2706     {
2707         if (useDynamicSMSG)
2708             PumpDatagramConnection();
2709         PumpNetworkSmsg();
2710         PumpLocalRdmaTransactions();
2711         SendRdmaMsg();
2712     }
2713     PMI_Barrier();
2714 }
2715
2716 void LrtsAbort(const char *message) {
2717     printf("CmiAbort is calling on PE:%d\n", myrank);
2718     CmiPrintStackTrace(0);
2719     PMI_Abort(-1, message);
2720 }
2721
2722 /**************************  TIMER FUNCTIONS **************************/
2723 #if CMK_TIMER_USE_SPECIAL
2724 /* MPI calls are not threadsafe, even the timer on some machines */
2725 static CmiNodeLock  timerLock = 0;
2726 static int _absoluteTime = 0;
2727 static int _is_global = 0;
2728 static struct timespec start_ts;
2729
2730 inline int CmiTimerIsSynchronized() {
2731     return 0;
2732 }
2733
2734 inline int CmiTimerAbsolute() {
2735     return _absoluteTime;
2736 }
2737
2738 double CmiStartTimer() {
2739     return 0.0;
2740 }
2741
2742 double CmiInitTime() {
2743     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
2744 }
2745
2746 void CmiTimerInit(char **argv) {
2747     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2748     if (_absoluteTime && CmiMyPe() == 0)
2749         printf("Charm++> absolute  timer is used\n");
2750     
2751     _is_global = CmiTimerIsSynchronized();
2752
2753
2754     if (_is_global) {
2755         if (CmiMyRank() == 0) {
2756             clock_gettime(CLOCK_MONOTONIC, &start_ts);
2757         }
2758     } else { /* we don't have a synchronous timer, set our own start time */
2759         CmiBarrier();
2760         CmiBarrier();
2761         CmiBarrier();
2762         clock_gettime(CLOCK_MONOTONIC, &start_ts);
2763     }
2764     CmiNodeAllBarrier();          /* for smp */
2765 }
2766
2767 /**
2768  * Since the timerLock is never created, and is
2769  * always NULL, then all the if-condition inside
2770  * the timer functions could be disabled right
2771  * now in the case of SMP.
2772  */
2773 double CmiTimer(void) {
2774     struct timespec now_ts;
2775     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2776     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2777         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2778 }
2779
2780 double CmiWallTimer(void) {
2781     struct timespec now_ts;
2782     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2783     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2784         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
2785 }
2786
2787 double CmiCpuTimer(void) {
2788     struct timespec now_ts;
2789     clock_gettime(CLOCK_MONOTONIC, &now_ts);
2790     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
2791         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
2792 }
2793
2794 #endif
2795 /************Barrier Related Functions****************/
2796
2797 int CmiBarrier()
2798 {
2799     int status;
2800
2801 #if CMK_SMP
2802     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2803     CmiNodeAllBarrier();
2804     if (CmiMyRank() == CmiMyNodeSize())
2805 #else
2806     if (CmiMyRank() == 0)
2807 #endif
2808     {
2809         /**
2810          *  The call of CmiBarrier is usually before the initialization
2811          *  of trace module of Charm++, therefore, the START_EVENT
2812          *  and END_EVENT are disabled here. -Chao Mei
2813          */
2814         /*START_EVENT();*/
2815         status = PMI_Barrier();
2816         GNI_RC_CHECK("PMI_Barrier", status);
2817         /*END_EVENT(10);*/
2818     }
2819     CmiNodeAllBarrier();
2820     return status;
2821
2822 }
2823 #if CMK_DIRECT
2824 #include "machine-cmidirect.c"
2825 #endif
2826 #if CMK_PERSISTENT_COMM
2827 #include "machine-persistent.c"
2828 #endif
2829
2830