call exit(0) after PMI_Finalize
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <errno.h>
16 #include <malloc.h>
17 #include "converse.h"
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21 /*Support for ++debug: */
22 #if defined(_WIN32) && ! defined(__CYGWIN__)
23 #include <windows.h>
24 #include <wincon.h>
25 #include <sys/types.h>
26 #include <sys/timeb.h>
27
28 //#define  USE_ONESIDED 1
29
30 #ifdef USE_ONESIDED
31 #include "onesided.h"
32 #endif
33 static void sleep(int secs) {
34     Sleep(1000*secs);
35 }
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #include "machine.h"
42
43 #include "pcqueue.h"
44
45 #define DEBUY_PRINT
46
47 #ifdef DEBUY_PRINT
48 #define PRINT_INFO(msg) {fprintf(stdout, "[%d] %s\n", CmiMyPe(), msg); fflush(stdout);}
49 #else
50 #define PRINT_INFO(msg)
51 #endif
52 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
53 /* If SMSG is not used */
54 #define FMA_PER_CORE  1024
55 #define FMA_BUFFER_SIZE 1024
56 /* If SMSG is used */
57 #define SMSG_MAX_MSG    1024
58 #define SMSG_MAX_CREDIT 16
59
60 #define MSGQ_MAXSIZE       4096
61 /* large message transfer with FMA or BTE */
62 #define LRTS_GNI_RDMA_THRESHOLD  16384
63
64 #define REMOTE_QUEUE_ENTRIES  1048576
65 #define LOCAL_QUEUE_ENTRIES 1024
66 /* SMSG is data message */
67 #define DATA_TAG        0x38
68 /* SMSG is a control message to initialize a BTE */
69 #define LMSG_INIT_TAG        0x39 
70 #define ACK_TAG         0x37
71
72 #define DEBUG
73 #ifdef GNI_RC_CHECK
74 #undef GNI_RC_CHECK
75 #endif
76 #ifdef DEBUG
77 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("%s; err=%s\n",msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
78 #else
79 #define GNI_RC_CHECK(msg,rc)
80 #endif
81
82 #ifdef USE_ONESIDED
83 onesided_hnd_t   onesided_hnd;
84 onesided_md_t    omdh;
85 #endif
86
87 static int useStaticSMSG   = 1;
88 static int useStaticMSGQ = 0;
89 static int useStaticFMA = 0;
90 static int mysize, myrank;
91 static gni_nic_handle_t      nic_hndl;
92
93
94 static void             **smsg_attr_ptr;
95 gni_msgq_attr_t         msgq_attrs;
96 gni_msgq_handle_t       msgq_handle;
97 gni_msgq_ep_attr_t      msgq_ep_attrs;
98 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
99 /* =====Beginning of Declarations of Machine Specific Variables===== */
100 static int cookie;
101 static int modes = 0;
102 static gni_cq_handle_t       rx_cqh = NULL;
103 static gni_cq_handle_t       remote_bte_cq_hndl = NULL;
104 static gni_cq_handle_t       tx_cqh = NULL;
105 static gni_ep_handle_t       *ep_hndl_array;
106
107 /* preallocated memory buffer for FMA for short message and control message */
108 static int              fma_buffer_len_eachcore = FMA_BUFFER_SIZE;
109 typedef struct {
110     gni_mem_handle_t mdh;
111     uint64_t addr;
112 } mdh_addr_t;
113
114 static mdh_addr_t            *fma_buffer_mdh_addr_base;
115 typedef struct msg_list
116 {
117     uint32_t destNode;
118     uint32_t size;
119     void *msg;
120     struct msg_list *next;
121     uint8_t tag;
122 }MSG_LIST;
123
124 typedef struct control_msg
125 {
126     uint64_t            source_addr;
127     int                 source;               /* source rank */
128     int                 length;
129     gni_mem_handle_t    source_mem_hndl;
130     struct control_msg *next;
131 }CONTROL_MSG;
132
133 /* reuse PendingMsg memory */
134 static CONTROL_MSG  *control_freelist=0;
135 static MSG_LIST     *msglist_freelist=0;
136
137 #define FreeControlMsg(d)       \
138   do {  \
139   (d)->next = control_freelist;\
140   control_freelist = d;\
141   } while (0); 
142
143 #define MallocControlMsg(d) \
144   d = control_freelist;\
145   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
146              _MEMCHECK(d);\
147   } else control_freelist = d->next;
148
149
150 #define FreeMsgList(d)       \
151   do {  \
152   (d)->next = msglist_freelist;\
153   msglist_freelist = d;\
154   } while (0); 
155
156 #define MallocMsgList(d) \
157   d = msglist_freelist;\
158   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
159              _MEMCHECK(d);\
160   } else msglist_freelist = d->next;
161
162
163 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
164 static MSG_LIST *buffered_smsg_head= 0;
165 static MSG_LIST *buffered_smsg_tail= 0;
166 /* SmsgSend return success but message sent is not confirmed by remote side */
167
168 static MSG_LIST *buffered_fma_head = 0;
169 static MSG_LIST *buffered_fma_tail = 0;
170
171 /* functions  */
172
173
174 static void* LrtsAllocRegister(int n_bytes, gni_mem_handle_t* mem_hndl);
175
176 static void
177 allgather(void *in,void *out, int len)
178 {
179     //PMI_Allgather is out of order
180     int i,rc, extend_len;
181     int  rank_index;
182     char *out_ptr, *out_ref;
183     char *in2;
184
185     extend_len = sizeof(int) + len;
186     in2 = (char*)malloc(extend_len);
187
188     memcpy(in2, &myrank, sizeof(int));
189     memcpy(in2+sizeof(int), in, len);
190
191     out_ptr = (char*)malloc(mysize*extend_len);
192
193     rc = PMI_Allgather(in2, out_ptr, extend_len);
194     GNI_RC_CHECK("allgather", rc);
195
196     out_ref = out;
197
198     for(i=0;i<mysize;i++) {
199         //rank index 
200         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
201         //copy to the rank index slot
202         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
203     }
204
205     free(out_ptr);
206     free(in2);
207
208 }
209
210 static unsigned int get_gni_nic_address(int device_id)
211 {
212     unsigned int address, cpu_id;
213     gni_return_t status;
214     int i, alps_dev_id=-1,alps_address=-1;
215     char *token, *p_ptr;
216
217     p_ptr = getenv("PMI_GNI_DEV_ID");
218     if (!p_ptr) {
219         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
220        
221         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
222     } else {
223         while ((token = strtok(p_ptr,":")) != NULL) {
224             alps_dev_id = atoi(token);
225             if (alps_dev_id == device_id) {
226                 break;
227             }
228             p_ptr = NULL;
229         }
230         CmiAssert(alps_dev_id != -1);
231         p_ptr = getenv("PMI_GNI_LOC_ADDR");
232         CmiAssert(p_ptr != NULL);
233         i = 0;
234         while ((token = strtok(p_ptr,":")) != NULL) {
235             if (i == alps_dev_id) {
236                 alps_address = atoi(token);
237                 break;
238             }
239             p_ptr = NULL;
240             ++i;
241         }
242         CmiAssert(alps_address != -1);
243         address = alps_address;
244     }
245     return address;
246 }
247
248 static uint8_t get_ptag(void)
249 {
250     char *p_ptr, *token;
251     uint8_t ptag;
252
253     p_ptr = getenv("PMI_GNI_PTAG");
254     CmiAssert(p_ptr != NULL);
255     token = strtok(p_ptr, ":");
256     ptag = (uint8_t)atoi(token);
257     return ptag;
258         
259 }
260
261 static uint32_t get_cookie(void)
262 {
263     uint32_t cookie;
264     char *p_ptr, *token;
265
266     p_ptr = getenv("PMI_GNI_COOKIE");
267     CmiAssert(p_ptr != NULL);
268     token = strtok(p_ptr, ":");
269     cookie = (uint32_t)atoi(token);
270
271     return cookie;
272 }
273
274 /*
275  * Local side event handler
276  *
277  */
278
279 void LocalEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
280 {
281
282     int type;
283
284     type = GNI_CQ_GET_TYPE(*cq_entry);
285
286     if(type == GNI_CQ_EVENT_TYPE_SMSG)
287     {
288
289     }
290 }
291
292 void RemoteSmsgEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
293 {
294 }
295
296 void RemoteBteEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
297 {
298 }
299 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
300 /* TODO: add any that are related */
301 /* =====End of Definitions of Message-Corruption Related Macros=====*/
302
303
304 #include "machine-lrts.h"
305 #include "machine-common-core.c"
306
307 /* Network progress function is used to poll the network when for
308    messages. This flushes receive buffers on some  implementations*/
309 #if CMK_MACHINE_PROGRESS_DEFINED
310 void CmiMachineProgressImpl() {
311 }
312 #endif
313
314
315 /* 
316  * The message can be copied to registered memory buffer and be sent
317  * This message memory can be registered to network. It depends on which one is cheaper
318  * register might be better when the msg is large
319  */
320
321 static int send_with_fma(int destNode, int size, char *msg)
322 {
323     gni_post_descriptor_t   pd;
324     gni_return_t status;
325     CONTROL_MSG *control_msg_tmp;
326     MSG_LIST *msg_tmp;
327     uint32_t              vmdh_index = -1;
328     if(buffered_fma_head != 0)
329     {
330         MallocMsgList(msg_tmp);
331         msg_tmp->msg = msg;
332         msg_tmp->destNode = destNode;
333         msg_tmp ->size = size;
334         buffered_smsg_tail->next = msg_tmp;
335         buffered_smsg_tail = msg_tmp;
336         return 0;
337     } else
338     {
339         pd.type            = GNI_POST_FMA_PUT;
340         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
341         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
342         pd.length          = 8;
343         pd.remote_addr     = fma_buffer_mdh_addr_base[destNode].addr + fma_buffer_len_eachcore*destNode;
344         pd.remote_mem_hndl = fma_buffer_mdh_addr_base[destNode].mdh;
345         if(size < FMA_BUFFER_SIZE)
346         {
347             /* send the message */
348             pd.local_addr      = (uint64_t) msg;
349         }else
350         {
351             /* construct a control message and send */
352             control_msg_tmp = (CONTROL_MSG *)malloc((int)sizeof(CONTROL_MSG));
353             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
354                 size, remote_bte_cq_hndl,
355                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
356                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
357             
358             control_msg_tmp->source = _Cmi_mynode;
359             control_msg_tmp->source_addr = (uint64_t)msg;
360             control_msg_tmp->length = size;
361         }
362         status = GNI_PostFma(ep_hndl_array[destNode], &pd);
363         if(status == GNI_RC_SUCCESS)
364             return 1;
365         else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
366         {
367             MallocMsgList(msg_tmp);
368             msg_tmp->msg = control_msg_tmp;
369             msg_tmp->destNode = destNode;
370             msg_tmp ->size = size;
371             /* store into buffer fma_list and send later */
372             buffered_fma_head = msg_tmp;
373             buffered_fma_tail = msg_tmp;
374             return 0;
375         }
376     }
377 }
378
379 static int send_with_smsg(int destNode, int size, char *msg)
380 {
381     gni_return_t        status;
382     MSG_LIST            *msg_tmp;
383     CONTROL_MSG         *control_msg_tmp;
384     uint8_t             tag_data    = DATA_TAG;
385     uint8_t             tag_control = LMSG_INIT_TAG ;
386     uint32_t            vmdh_index  = -1;
387
388     /* No mailbox available, buffer this msg and its info */
389     if(buffered_smsg_head != 0)
390     {
391         MallocMsgList(msg_tmp);
392         msg_tmp->destNode = destNode;
393         if(size <=SMSG_MAX_MSG)
394         {
395             msg_tmp->size   = size;
396             msg_tmp->msg    = msg;
397             msg_tmp->tag    = tag_data;
398         }else
399         {
400             MallocControlMsg(control_msg_tmp);
401
402             status = GNI_MemRegister(nic_hndl, (uint64_t)msg, 
403                 size, rx_cqh,
404                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
405                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
406             
407             GNI_RC_CHECK("MemRegister fails at ", status);
408             control_msg_tmp->source_addr    = (uint64_t)msg;
409             control_msg_tmp->source         = myrank;
410             control_msg_tmp->length         =size; 
411             msg_tmp->size                   = sizeof(CONTROL_MSG);
412             msg_tmp->msg                    = control_msg_tmp;
413             msg_tmp->tag                    = tag_control;
414         }
415         msg_tmp->next               = 0;
416         buffered_smsg_tail->next    = msg_tmp;
417         buffered_smsg_tail          = msg_tmp;
418         return 0;
419     }else
420     {
421         /* Can use SMSGSend */
422         if(size <= SMSG_MAX_MSG)
423         {
424             /* send the msg itself */
425             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], &size, sizeof(int), msg, size, 0, tag_data);
426             if (status == GNI_RC_SUCCESS)
427             {
428                 CmiFree(msg);
429                 return 1;
430             }
431             else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
432             {
433                 //CmiPrintf("[%d] data msg add to send queue\n", myrank);
434                 MallocMsgList(msg_tmp);
435                 msg_tmp->destNode   = destNode;
436                 msg_tmp->size       = size;
437                 msg_tmp->msg        = msg;
438                 msg_tmp->tag        = tag_data;
439                 msg_tmp->next       = 0;
440                 /* store into buffer smsg_list and send later */
441                 buffered_smsg_head  = msg_tmp;
442                 buffered_smsg_tail  = msg_tmp;
443                 return 0;
444             }
445             else
446                 GNI_RC_CHECK("GNI_SmsgSendWTag", status);
447         }else
448         {
449             /* construct a control message and send */
450             //control_msg_tmp = (CONTROL_MSG *)malloc(sizeof(CONTROL_MSG));
451             MallocControlMsg(control_msg_tmp);
452             control_msg_tmp->source_addr    = (uint64_t)msg;
453             control_msg_tmp->source         = myrank;
454             control_msg_tmp->length         = size; 
455             status = GNI_MemRegister(nic_hndl, (uint64_t)msg, 
456                 size, rx_cqh,
457                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
458                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
459             
460             GNI_RC_CHECK("MemRegister fails at ", status);
461             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
462             if(status == GNI_RC_SUCCESS)
463             {
464                 FreeControlMsg(control_msg_tmp);
465                 //CmiPrintf("Control message sent bytes:%d on PE:%d, source=%d, add=%p, mem_hndl=%ld, %ld\n", sizeof(CONTROL_MSG), myrank, control_msg_tmp->source, (void*)control_msg_tmp->source_addr, (control_msg_tmp->source_mem_hndl).qword1, (control_msg_tmp->source_mem_hndl).qword2);
466                 return 1;
467             }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
468             {
469                 //CmiPrintf("[%d] control msg add to send queue\n", myrank);
470                 MallocMsgList(msg_tmp);
471                 msg_tmp->destNode   = destNode;
472                 msg_tmp ->size      = sizeof(CONTROL_MSG);
473                 msg_tmp->msg        = control_msg_tmp;
474                 msg_tmp->tag        = tag_control;
475                 msg_tmp->next       = 0;
476                 /* store into buffer smsg_list and send later */
477                 buffered_smsg_head = msg_tmp;
478                 buffered_smsg_tail = msg_tmp;
479                 return 0;
480             }
481             else
482                 GNI_RC_CHECK("GNI_SmsgSendWTag", status);
483         }
484     }
485 }
486
487 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
488 {
489     //PRINT_INFO("Calling LrtsSend")
490     if(useStaticSMSG)
491     {
492         send_with_smsg(destNode, size, msg); 
493     }else
494     {
495         send_with_fma(destNode, size, msg); 
496     }
497     return 0;
498 }
499
500 static void LrtsPreCommonInit(int everReturn){}
501 static void LrtsPostCommonInit(int everReturn){}
502 static void LrtsDrainResources() /* used when exit */
503 {}
504
505 void LrtsPostNonLocal(){}
506 /* pooling CQ to receive network message */
507 static void PumpNetworkMsgs()
508 {
509     void                *header;
510     uint8_t             msg_tag, data_tag, control_tag, ack_tag;
511     gni_return_t        status;
512     uint64_t            inst_id;
513     gni_cq_entry_t      event_data;
514     int                 msg_nbytes;
515     void                *msg_data;
516     gni_mem_handle_t    msg_mem_hndl;
517     CONTROL_MSG         *request_msg;
518     gni_post_descriptor_t *pd;
519
520     data_tag        = DATA_TAG;
521     control_tag     = LMSG_INIT_TAG;
522     ack_tag         = ACK_TAG;
523     
524     while (1) {
525     status = GNI_CqGetEvent(rx_cqh, &event_data);
526     if(status == GNI_RC_SUCCESS)
527     {
528         inst_id = GNI_CQ_GET_INST_ID(event_data);
529     }else if (status == GNI_RC_NOT_DONE)
530     {
531         return;
532     }else
533     {
534         GNI_RC_CHECK("CQ Get event", status);
535     }
536   
537     msg_tag = GNI_SMSG_ANY_TAG;
538     status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
539
540     //CmiPrintf("++++## PumpNetwork Small msg is received on PE:%d message tag=%c\n", myrank, msg_tag);
541     if(status  == GNI_RC_SUCCESS)
542     {
543         /* copy msg out and then put into queue */
544         if(msg_tag == data_tag)
545         {
546             memcpy(&msg_nbytes, header, sizeof(int));
547             msg_data    = CmiAlloc(msg_nbytes);
548             memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
549             handleOneRecvedMsg(msg_nbytes, msg_data);
550         }else if(msg_tag == control_tag)
551         {
552             /* initial a get to transfer data from the sender side */
553             request_msg = (CONTROL_MSG *) header;
554             msg_data = LrtsAllocRegister(request_msg->length, &msg_mem_hndl); //need align checking
555             pd = (gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t));
556             if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
557                 pd->type            = GNI_POST_FMA_GET;
558             else
559                 pd->type            = GNI_POST_RDMA_GET;
560
561             pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
562             pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
563             pd->length          = request_msg->length;
564             pd->local_addr      = (uint64_t) msg_data;
565             pd->local_mem_hndl  = msg_mem_hndl; 
566             pd->remote_addr     = request_msg->source_addr;
567             pd->remote_mem_hndl = request_msg->source_mem_hndl;
568             pd->src_cq_hndl     = 0;     /* tx_cqh;  */
569             pd->rdma_mode       = 0;
570
571            // CmiPrintf("source=%d, addr=%p, handler=%ld,%ld \n", request_msg->source, (void*)pd.remote_addr, (request_msg->source_mem_hndl).qword1, (request_msg->source_mem_hndl).qword2);
572             if(pd->type == GNI_POST_RDMA_GET) 
573                 status = GNI_PostRdma(ep_hndl_array[request_msg->source], pd);
574             else
575                 status = GNI_PostFma(ep_hndl_array[request_msg->source],  pd);
576             GNI_RC_CHECK("post ", status);
577            
578             //CmiPrintf("post status=%s on PE:%d\n", gni_err_str[status], myrank);
579             if(status = GNI_RC_SUCCESS)
580             {
581                 /* put into receive buffer queue */
582             }else
583             {
584             }
585         }else if(msg_tag == ack_tag){
586             /* Get is done, release message . Now put is not used yet*/
587             request_msg = (CONTROL_MSG *) header;
588             //CmiPrintf("++++## ACK msg is received on PE:%d message size=%d, addr=%p\n", myrank, request_msg->length, (void*)request_msg->source_addr);
589             CmiFree((void*)request_msg->source_addr);
590             //CmiPrintf("++++## release ACK msg is received on PE:%d message size=%d\n", myrank, request_msg->length);
591         }else{
592             CmiPrintf("weird tag problem\n");
593             CmiAbort("Unknown tag\n");
594         }
595         GNI_SmsgRelease(ep_hndl_array[inst_id]);
596     }  //else not match smsg, maybe larger message
597     }   // end of while loop
598 }
599
600 /* Check whether message send or get is confirmed by remote */
601 static void PumpLocalTransactions()
602 {
603     gni_cq_entry_t ev;
604     gni_return_t status;
605     uint64_t type, inst_id, data_addr;
606     uint8_t         ack_tag = ACK_TAG;
607     gni_post_descriptor_t *tmp_pd;
608     MSG_LIST *msg_tmp;
609     //gni_post_descriptor_t   ack_pd;
610     MSG_LIST  *ptr;
611     CONTROL_MSG *ack_msg_tmp;
612
613     while (1) 
614     {
615
616         status = GNI_CqGetEvent(tx_cqh, &ev);
617         if(status == GNI_RC_SUCCESS)
618         {
619             type        = GNI_CQ_GET_TYPE(ev);
620             inst_id     = GNI_CQ_GET_INST_ID(ev);
621             data_addr   = GNI_CQ_GET_DATA(ev);
622         }else if (status == GNI_RC_NOT_DONE)
623         {
624             return;
625         }else
626         {
627             GNI_RC_CHECK("CQ Get event", status);
628         }
629
630         if(type == GNI_CQ_EVENT_TYPE_POST)
631         {
632             status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
633             GNI_RC_CHECK("Local CQ completed ", status);
634             //Message is sent, free message , put is not used now
635             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
636             {
637                 CmiFree((void *)tmp_pd->local_addr);
638             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
639             {
640                 /* Send an ACK to remote side */
641                 //CmiPrintf("\nPE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
642                 ////CmiPrintf("\n+PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
643                 MallocControlMsg(ack_msg_tmp);
644                 ack_msg_tmp->source = myrank;
645                 //CmiPrintf("\n++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
646                 ////CmiPrintf("\n+++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
647                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
648                 ack_msg_tmp->length=tmp_pd->length; 
649                 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
650                 //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
651            
652                 if(buffered_smsg_head!=0)
653                 {
654                     //CmiPrintf("[%d] PumpLocalTransactions: smsg buffered.\n", myrank);
655                     MallocMsgList(msg_tmp);
656                     msg_tmp->msg = ack_msg_tmp;
657                     msg_tmp ->size = sizeof(CONTROL_MSG);
658                     msg_tmp->tag = ack_tag;
659                     msg_tmp->destNode = inst_id;
660                     msg_tmp->next = 0;
661                     buffered_smsg_tail->next = msg_tmp;
662                     buffered_smsg_tail = msg_tmp;
663                 }else
664                 {
665                     //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
666                     status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
667                     if(status == GNI_RC_SUCCESS)
668                     {
669                         FreeControlMsg(ack_msg_tmp);
670                     }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
671                     {
672                         // CmiPrintf("[%d] PumpLocalTransactions: ack smsg buffered.\n", myrank);
673                         MallocMsgList(msg_tmp);
674                         msg_tmp->msg = ack_msg_tmp;
675                         msg_tmp ->size = sizeof(CONTROL_MSG);
676                         msg_tmp->tag = ack_tag;
677                         msg_tmp->next = 0;
678                         msg_tmp->destNode = inst_id;
679                         buffered_smsg_head = msg_tmp;
680                         buffered_smsg_tail = msg_tmp;
681                     }
682                     else
683                         GNI_RC_CHECK("GNI_SmsgSendWTag", status);
684                 }
685                 handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
686             }
687         }
688     }   /* end of while loop */
689 }
690
691 static void SendBufferMsg()
692 {
693     MSG_LIST        *ptr;
694     uint8_t         tag_data, tag_control, tag_ack;
695     gni_return_t     status;
696
697     tag_data    = DATA_TAG;
698     tag_control = LMSG_INIT_TAG;
699     tag_ack     = ACK_TAG;
700     /* can add flow control here to control the number of messages sent before handle message */
701     while(buffered_smsg_head != 0)
702     {
703         if(useStaticSMSG)
704         {
705             ptr = buffered_smsg_head;
706             if(ptr->tag == tag_data)
707             {
708                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], &(ptr->size), (uint32_t)sizeof(int), ptr->msg, ptr->size, 0, tag_data);
709                 if(status == GNI_RC_SUCCESS)
710                     CmiFree(ptr->msg);
711             }else if(ptr->tag ==tag_control)
712             {
713                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_control);
714                 if(status == GNI_RC_SUCCESS)
715                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
716             }else if (ptr->tag == tag_ack)
717             {
718                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_ack);
719                 if(status == GNI_RC_SUCCESS)
720                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
721             }
722         } else if(useStaticMSGQ)
723         {
724             CmiPrintf("MSGQ Send not done\n");
725         }else
726         {
727             CmiPrintf("FMA Send not done\n");
728         }
729         if(status == GNI_RC_SUCCESS)
730         {
731             buffered_smsg_head= buffered_smsg_head->next;
732             FreeMsgList(ptr);
733         }else
734             break;
735     }
736 }
737 static void LrtsAdvanceCommunication()
738 {
739     /*  Receive Msg first */
740     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
741     PumpNetworkMsgs();
742     /* Release Sent Msg */
743     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
744     PumpLocalTransactions();
745     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
746     /* Send buffered Message */
747     SendBufferMsg();
748 }
749
750 void remoteEventHandle(gni_cq_entry_t *event_data, void *context)
751 {
752     gni_return_t status, source_data, source_control;
753     uint64_t            source;
754     void                *header;
755     uint8_t             tag_data;
756     uint8_t             tag_control;
757
758     tag_data = DATA_TAG;
759     tag_control = LMSG_INIT_TAG;
760     /* pool the CQ to check which smsg endpoint to get the data */
761     //status = GNI_CqGetEvent(remote_cq_hndl, &event_data);
762         
763     /* check whether it is data or control information */
764     source = GNI_CQ_GET_SOURCE(*event_data);
765
766     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_data)) == GNI_RC_SUCCESS)
767     {
768         /* copy msg out and then put into queue */
769
770     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_control)) == GNI_RC_SUCCESS)
771     {
772         /* initial a get to transfer data from the sender side */
773     } else
774     {
775
776     }
777
778 }
779
780 /* if FMA is used to transfer small messages, static allocation*/
781 static void _init_smallMsgwithFma(int size)
782 {
783     char            *fma_buffer;
784     gni_mem_handle_t      fma_buffer_mdh_addr;
785     mdh_addr_t            my_mdh_addr;
786     gni_return_t status;
787     
788     fma_buffer = (char *)calloc(FMA_PER_CORE*size, 1);
789     CmiAssert(fma_buffer != NULL);
790
791     status = GNI_MemRegister(nic_hndl, (uint64_t)fma_buffer,
792         FMA_PER_CORE*size, rx_cqh, 
793         GNI_MEM_READWRITE | GNI_MEM_USE_GART, -1,
794         &fma_buffer_mdh_addr);
795
796     GNI_RC_CHECK("Memregister DMA ", status);
797     /* Gather up all of the mdh's over the socket network, 
798      * * this also serves as a barrier */
799     fma_buffer_mdh_addr_base = (mdh_addr_t*)malloc(size* sizeof(mdh_addr_t));
800     CmiAssert(fma_buffer_mdh_addr_base);
801
802     my_mdh_addr.addr = (uint64_t)fma_buffer;
803     my_mdh_addr.mdh = fma_buffer_mdh_addr;
804     allgather(&my_mdh_addr, fma_buffer_mdh_addr_base, sizeof(mdh_addr_t));      
805
806 }
807
808 static void _init_static_smsg()
809 {
810     gni_smsg_attr_t      *smsg_attr;
811     gni_smsg_attr_t      *smsg_attr_vec;
812     unsigned int         smsg_memlen;
813     gni_mem_handle_t     my_smsg_mdh_mailbox;
814     register    int      i;
815     gni_return_t status;
816     uint32_t              vmdh_index = -1;
817
818      smsg_attr = (gni_smsg_attr_t *)malloc(mysize*sizeof(gni_smsg_attr_t));
819     _MEMCHECK(smsg_attr);
820
821     smsg_attr_ptr = malloc(sizeof(void*) *mysize);
822     for(i=0; i<mysize; i++)
823     {
824         if(i==myrank)
825             continue;
826         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX;
827         smsg_attr[i].mbox_offset = 0;
828         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
829         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
830         status = GNI_SmsgBufferSizeNeeded(&smsg_attr[i], &smsg_memlen);
831         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
832
833         smsg_attr_ptr[i] = memalign(64, smsg_memlen);
834         _MEMCHECK(smsg_attr_ptr[i]);
835         bzero(smsg_attr_ptr[i], smsg_memlen);
836         status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_attr_ptr[i],
837             smsg_memlen, rx_cqh,
838             GNI_MEM_READWRITE | GNI_MEM_USE_GART | GNI_MEM_PI_FLUSH,   
839             vmdh_index,
840             &my_smsg_mdh_mailbox);
841
842         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
843       
844         smsg_attr[i].msg_buffer = smsg_attr_ptr[i];
845         smsg_attr[i].buff_size = smsg_memlen;
846         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
847     }
848     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
849     CmiAssert(smsg_attr_vec);
850    
851     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
852     //MPI_Alltoall(smsg_attr, sizeof(gni_smsg_attr_t), MPI_BYTE, smsg_attr_vec, sizeof(gni_smsg_attr_t), MPI_BYTE, MPI_COMM_WORLD);
853     for(i=0; i<mysize; i++)
854     {
855         if (myrank == i) continue;
856         /* initialize the smsg channel */
857         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
858         GNI_RC_CHECK("SMSG Init", status);
859     } //end initialization
860     free(smsg_attr);
861     free(smsg_attr_vec);
862
863
864 static void _init_static_msgq()
865 {
866     gni_return_t status;
867     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
868     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
869     msgq_attrs.smsg_q_sz = 1;
870     msgq_attrs.rcv_pool_sz = 1;
871     msgq_attrs.num_msgq_eps = 2;
872     msgq_attrs.nloc_insts = 8;
873     msgq_attrs.modes = 0;
874     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
875
876     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
877     GNI_RC_CHECK("MSGQ Init", status);
878
879
880 }
881 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
882 {
883     register int            i;
884     int                     rc;
885     int                     device_id = 0;
886     unsigned int            remote_addr;
887     gni_cdm_handle_t        cdm_hndl;
888     gni_return_t            status = GNI_RC_SUCCESS;
889     uint32_t                vmdh_index = -1;
890     uint8_t                 ptag;
891     unsigned int            local_addr, *MPID_UGNI_AllAddr;
892     int                     first_spawned;
893
894     void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
895     void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
896     void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
897     
898     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
899     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
900     
901     status = PMI_Init(&first_spawned);
902     GNI_RC_CHECK("PMI_Init", status);
903
904     status = PMI_Get_size(&mysize);
905     GNI_RC_CHECK("PMI_Getsize", status);
906
907     status = PMI_Get_rank(&myrank);
908     GNI_RC_CHECK("PMI_getrank", status);
909
910     *myNodeID = myrank;
911     *numNodes = mysize;
912   
913     if(myrank == 0)
914     {
915         printf("Charm++> Running on Gemini (GNI)\n");
916     }
917 #ifdef USE_ONESIDED
918     onesided_init(NULL, &onesided_hnd);
919
920     // this is a GNI test, so use the libonesided bypass functionality
921     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
922     local_addr = gniGetNicAddress();
923 #else
924     ptag = get_ptag();
925     cookie = get_cookie();
926     
927     //Create and attach to the communication  domain */
928     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
929     GNI_RC_CHECK("GNI_CdmCreate", status);
930     //* device id The device id is the minor number for the device
931     //that is assigned to the device by the system when the device is created.
932     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
933     //where X is the device number 0 default 
934     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
935     GNI_RC_CHECK("GNI_CdmAttach", status);
936     local_addr = get_gni_nic_address(0);
937 #endif
938     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
939     _MEMCHECK(MPID_UGNI_AllAddr);
940     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
941     /* create the local completion queue */
942     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
943     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &tx_cqh);
944     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, &local_event_handler, NULL, &tx_cqh);
945     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
946     
947     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
948
949     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
950     GNI_RC_CHECK("Create CQ (rx)", status);
951     
952     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
953     //GNI_RC_CHECK("Create BTE CQ", status);
954
955     /* create the endpoints. they need to be bound to allow later CQWrites to them */
956     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
957     _MEMCHECK(ep_hndl_array);
958
959     for (i=0; i<mysize; i++) {
960         if(i == myrank) continue;
961         status = GNI_EpCreate(nic_hndl, tx_cqh, &ep_hndl_array[i]);
962         GNI_RC_CHECK("GNI_EpCreate ", status);   
963         remote_addr = MPID_UGNI_AllAddr[i];
964         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
965         GNI_RC_CHECK("GNI_EpBind ", status);   
966     }
967     /* Depending on the number of cores in the job, decide different method */
968     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
969     if(useStaticSMSG == 1)
970     {
971         _init_static_smsg(mysize);
972     }else if(useStaticMSGQ == 1)
973     {
974         _init_static_msgq();
975     }else if( useStaticFMA == 1)
976     {
977         _init_smallMsgwithFma(mysize);
978     }
979     free(MPID_UGNI_AllAddr);
980     //PRINT_INFO("\nDone with LrtsInit")
981 }
982
983 #define ALIGNBUF                64
984
985 void* LrtsAlloc(int n_bytes, int header)
986 {
987     if(n_bytes <= SMSG_MAX_MSG)
988     {
989         int totalsize = n_bytes+header;
990         return malloc(totalsize);
991 /*
992     }else if(n_bytes <= LRTS_GNI_RDMA_THRESHOLD)
993     {
994         return malloc(n_bytes);
995 */
996     }else 
997     {
998         CmiAssert(header <= ALIGNBUF);
999         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1000         return res + ALIGNBUF - header;
1001     }
1002 }
1003
1004 void  LrtsFree(void *msg)
1005 {
1006     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1007     if (size <= SMSG_MAX_MSG)
1008       free(msg);
1009     else
1010       free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1011 }
1012
1013 static void* LrtsAllocRegister(int n_bytes, gni_mem_handle_t* mem_hndl)
1014 {
1015     void *ptr;
1016     gni_return_t status;
1017     ptr = CmiAlloc(n_bytes);
1018     _MEMCHECK(ptr);
1019     status = GNI_MemRegister(nic_hndl, (uint64_t)ptr,
1020         n_bytes, rx_cqh, 
1021         GNI_MEM_READWRITE | GNI_MEM_USE_GART, -1,
1022         mem_hndl);
1023
1024     GNI_RC_CHECK("GNI_MemRegister in LrtsAlloc", status);
1025     return ptr;
1026
1027 }
1028
1029 static void LrtsExit()
1030 {
1031     /* free memory ? */
1032     PMI_Finalize();
1033     exit(0);
1034 }
1035
1036 void CmiAbort(const char *message) {
1037
1038     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1039     PMI_Abort(-1, message);
1040 }
1041
1042 #if 0
1043 /**************************  TIMER FUNCTIONS **************************/
1044 #if CMK_TIMER_USE_SPECIAL
1045 /* MPI calls are not threadsafe, even the timer on some machines */
1046 static CmiNodeLock  timerLock = 0;
1047 static int _absoluteTime = 0;
1048 static double starttimer = 0;
1049 static int _is_global = 0;
1050
1051 int CmiTimerIsSynchronized() {
1052     int  flag;
1053     void *v;
1054
1055     /*  check if it using synchronized timer */
1056     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1057         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1058     if (flag) {
1059         _is_global = *(int*)v;
1060         if (_is_global && CmiMyPe() == 0)
1061             printf("Charm++> MPI timer is synchronized\n");
1062     }
1063     return _is_global;
1064 }
1065
1066 int CmiTimerAbsolute() {
1067     return _absoluteTime;
1068 }
1069
1070 double CmiStartTimer() {
1071     return 0.0;
1072 }
1073
1074 double CmiInitTime() {
1075     return starttimer;
1076 }
1077
1078 void CmiTimerInit(char **argv) {
1079     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1080     if (_absoluteTime && CmiMyPe() == 0)
1081         printf("Charm++> absolute MPI timer is used\n");
1082     _is_global = CmiTimerIsSynchronized();
1083
1084     if (_is_global) {
1085         if (CmiMyRank() == 0) {
1086             double minTimer;
1087 #if CMK_TIMER_USE_XT3_DCLOCK
1088             starttimer = dclock();
1089 #else
1090             starttimer = MPI_Wtime();
1091 #endif
1092
1093             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1094                 MPI_COMM_WORLD );
1095             starttimer = minTimer;
1096         }
1097     } else { /* we don't have a synchronous timer, set our own start time */
1098         CmiBarrier();
1099         CmiBarrier();
1100         CmiBarrier();
1101 #if CMK_TIMER_USE_XT3_DCLOCK
1102         starttimer = dclock();
1103 #else
1104         starttimer = MPI_Wtime();
1105 #endif
1106     }
1107
1108     CmiNodeAllBarrier();          /* for smp */
1109 }
1110
1111 /**
1112  * Since the timerLock is never created, and is
1113  * always NULL, then all the if-condition inside
1114  * the timer functions could be disabled right
1115  * now in the case of SMP.
1116  */
1117 double CmiTimer(void) {
1118     double t;
1119 #if CMK_TIMER_USE_XT3_DCLOCK
1120     t = dclock();
1121 #else
1122     t = MPI_Wtime();
1123 #endif
1124     return _absoluteTime?t: (t-starttimer);
1125 }
1126
1127 double CmiWallTimer(void) {
1128     double t;
1129 #if CMK_TIMER_USE_XT3_DCLOCK
1130     t = dclock();
1131 #else
1132     t = MPI_Wtime();
1133 #endif
1134     return _absoluteTime? t: (t-starttimer);
1135 }
1136
1137 double CmiCpuTimer(void) {
1138     double t;
1139 #if CMK_TIMER_USE_XT3_DCLOCK
1140     t = dclock() - starttimer;
1141 #else
1142     t = MPI_Wtime() - starttimer;
1143 #endif
1144     return t;
1145 }
1146
1147 #endif
1148 #endif
1149 /************Barrier Related Functions****************/
1150
1151 int CmiBarrier()
1152 {
1153     int status;
1154     status = PMI_Barrier();
1155     return status;
1156
1157 }