fixed a bug in pumpMsgs.
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <errno.h>
16 #include <malloc.h>
17 #include "converse.h"
18
19 #include "gni_pub.h"
20 #include "mpi.h"
21 #include "pmi.h"
22 /*Support for ++debug: */
23 #if defined(_WIN32) && ! defined(__CYGWIN__)
24 #include <windows.h>
25 #include <wincon.h>
26 #include <sys/types.h>
27 #include <sys/timeb.h>
28 static void sleep(int secs) {
29     Sleep(1000*secs);
30 }
31 #else
32 #include <unistd.h> /*For getpid()*/
33 #endif
34 #include <stdlib.h> /*For sleep()*/
35
36 #include "machine.h"
37
38 #include "pcqueue.h"
39
40 #define DEBUY_PRINT
41
42 #ifdef DEBUY_PRINT
43 #define PRINT_INFO(msg) {fprintf(stdout, "[%d] %s\n", CmiMyPe(), msg); fflush(stdout);}
44 #else
45 #define PRINT_INFO(msg)
46 #endif
47 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
48 static int useSMSG   = 1;
49 static int useDynamicSmsg = 0;
50 static int useMsgq = 0;
51 static int size, rank;
52 #define FMA_PER_CORE  1024
53 #define FMA_BUFFER_SIZE 1024
54 #define SMSG_PER_MSG    1024
55 #define SMSG_MAX_CREDIT 16
56 #define SMSG_BUFFER_SIZE        102400
57 #define FMA_BTE_THRESHOLD  4096
58 #define MSGQ_MAXSIZE       4096
59
60 #define DEBUG
61
62 #ifdef GNI_RC_CHECK
63 #undef GNI_RC_CHECK
64 #endif
65
66 #ifdef DEBUG
67 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("%s; err=%s\n",msg,gni_err_str[rc]); exit(911); } } while(0)
68 #else
69 #define GNI_RC_CHECK(msg,rc)
70 #endif
71
72 static gni_nic_handle_t      nic_hndl;
73 static unsigned int         *MPID_UGNI_AllAddr;
74 static gni_smsg_attr_t      *smsg_attr;
75 static gni_smsg_attr_t      *smsg_attr_vec;
76 static char                 *smsg_mem_buffer = NULL;
77 static uint32_t             smsg_memlen;
78
79 gni_msgq_attr_t         msgq_attrs;
80 gni_msgq_handle_t       msgq_handle;
81 gni_msgq_ep_attr_t      msgq_ep_attrs;
82 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
83
84 #define REMOTE_QUEUE_ENTRIES  1048576
85 #define LOCAL_QUEUE_ENTRIES 1024
86 /* SMSG is data message */
87 #define DATA_TAG        0
88 /* SMSG is a control message to initialize a BTE */
89 #define LMSG_INIT_TAG        1 
90 /* =====Beginning of Declarations of Machine Specific Variables===== */
91 static int cookie;
92 static int modes = 0;
93 static gni_cq_handle_t       rx_cqh = NULL;
94 static gni_cq_handle_t       remote_bte_cq_hndl = NULL;
95 static gni_cq_handle_t       tx_cqh = NULL;
96 static gni_ep_handle_t       *ep_hndl_array;
97
98 /* preallocated memory buffer for FMA for short message and control message */
99 static char            *fma_buffer;
100 static int              fma_buffer_len_eachcore = FMA_BUFFER_SIZE;
101 typedef struct {
102     gni_mem_handle_t mdh;
103     uint64_t addr;
104 } mdh_addr_t;
105
106 static mdh_addr_t            my_mdh_addr;
107 static mdh_addr_t            *fma_buffer_mdh_addr_base;
108 static gni_mem_handle_t      fma_buffer_mdh_addr;
109 static gni_mem_handle_t      my_smsg_mdh_mailbox;
110 /* =====Beginning of Declarations of Machine Specific Functions===== */
111
112 typedef struct msg_list
113 {
114     int destNode;
115     int size;
116     void *msg;
117     struct msg_list *next;
118
119 }MSG_LIST;
120
121 typedef struct control_msg
122 {
123     int             source;   //source rank
124     uint64_t        source_addr;
125     gni_mem_handle_t    source_mem_hndl;
126     uint64_t            length;
127 }CONTROL_MSG;
128
129 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
130 static MSG_LIST *buffered_smsg_head= 0;
131 static MSG_LIST *buffered_smsg_tail= 0;
132 /* SmsgSend return success but message sent is not confirmed by remote side */
133
134 static MSG_LIST *buffered_fma_head = 0;
135 static MSG_LIST *buffered_fma_tail = 0;
136
137 static unsigned int get_gni_nic_address(int device_id)
138 {
139     unsigned int address, cpu_id;
140     gni_return_t status;
141     int i, alps_dev_id=-1,alps_address=-1;
142     char *token, *p_ptr;
143
144     p_ptr = getenv("PMI_GNI_DEV_ID");
145     if (!p_ptr) {
146         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
147        
148         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
149     } else {
150         while ((token = strtok(p_ptr,":")) != NULL) {
151             alps_dev_id = atoi(token);
152             if (alps_dev_id == device_id) {
153                 break;
154             }
155             p_ptr = NULL;
156         }
157         CmiAssert(alps_dev_id != -1);
158         p_ptr = getenv("PMI_GNI_LOC_ADDR");
159         CmiAssert(p_ptr != NULL);
160         i = 0;
161         while ((token = strtok(p_ptr,":")) != NULL) {
162             if (i == alps_dev_id) {
163                 alps_address = atoi(token);
164                 break;
165             }
166             p_ptr = NULL;
167             ++i;
168         }
169         CmiAssert(alps_address != -1);
170         address = alps_address;
171     }
172     return address;
173 }
174 static void* gather_nic_addresses(void)
175 {
176     unsigned int local_addr,*alladdrs;
177     int size,rc;
178     size_t addr_len;
179
180     MPI_Comm_size(MPI_COMM_WORLD,&size);
181
182     /*
183      * * just assume a single gemini device
184      */
185     local_addr = get_gni_nic_address(0);
186
187     addr_len = sizeof(unsigned int);
188
189     alladdrs = (unsigned int *)malloc(addr_len * size);
190     CmiAssert(alladdrs != NULL);
191
192     MPI_Allgather(&local_addr, addr_len, MPI_BYTE, alladdrs, addr_len, MPI_BYTE, MPI_COMM_WORLD);
193
194     return (void *)alladdrs;
195
196 }
197
198 static uint8_t get_ptag(void)
199 {
200     char *p_ptr, *token;
201     uint8_t ptag;
202
203     p_ptr = getenv("PMI_GNI_PTAG");
204     CmiAssert(p_ptr != NULL);
205     token = strtok(p_ptr, ":");
206     ptag = (uint8_t)atoi(token);
207     return ptag;
208         
209 }
210
211 static uint32_t get_cookie(void)
212 {
213     uint32_t cookie;
214     char *p_ptr, *token;
215
216     p_ptr = getenv("PMI_GNI_COOKIE");
217     CmiAssert(p_ptr != NULL);
218     token = strtok(p_ptr, ":");
219     cookie = (uint32_t)atoi(token);
220
221     return cookie;
222 }
223
224 /*
225  * Local side event handler
226  *
227  */
228
229 void LocalEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
230 {
231
232     int type;
233
234     type = GNI_CQ_GET_TYPE(*cq_entry);
235
236     if(type == GNI_CQ_EVENT_TYPE_SMSG)
237     {
238
239     }
240 }
241
242 void RemoteSmsgEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
243 {
244 }
245
246 void RemoteBteEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
247 {
248 }
249 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
250 /* TODO: add any that are related */
251 /* =====End of Definitions of Message-Corruption Related Macros=====*/
252
253
254 #include "machine-common.h"
255 #include "machine-common.c"
256
257 /* Network progress function is used to poll the network when for
258    messages. This flushes receive buffers on some  implementations*/
259 #if CMK_MACHINE_PROGRESS_DEFINED
260 void CmiMachineProgressImpl() {
261 }
262 #endif
263
264
265 /* 
266  * The message can be copied to registered memory buffer and be sent
267  * This message memory can be registered to network. It depends on which one is cheaper
268  *
269  * register might be better when the msg is large
270  */
271
272 static int send_with_fma(int destNode, int size, char *msg)
273 {
274     gni_post_descriptor_t   pd;
275     gni_return_t status;
276     CONTROL_MSG *control_msg_tmp;
277     MSG_LIST *msg_tmp;
278     if(buffered_fma_head != 0)
279     {
280         msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
281         msg_tmp->msg = msg;
282         msg_tmp->destNode = destNode;
283         msg_tmp ->size = size;
284         buffered_smsg_tail->next = msg_tmp;
285         buffered_smsg_tail = msg_tmp;
286         return 0;
287     } else
288     {
289         pd.type            = GNI_POST_FMA_PUT;
290         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
291         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
292         pd.length          = 8;
293         pd.remote_addr     = fma_buffer_mdh_addr_base[destNode].addr + fma_buffer_len_eachcore*destNode;
294         pd.remote_mem_hndl = fma_buffer_mdh_addr_base[destNode].mdh;
295         if(size < FMA_BUFFER_SIZE)
296         {
297             /* send the message */
298             pd.local_addr      = (uint64_t) msg;
299         }else
300         {
301             /* construct a control message and send */
302             control_msg_tmp = (CONTROL_MSG *)LrtsAlloc((int)sizeof(CONTROL_MSG));
303             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
304                 size, remote_bte_cq_hndl,
305                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
306                 -1, &(control_msg_tmp->source_mem_hndl));
307             
308             control_msg_tmp->source = _Cmi_mynode;
309             control_msg_tmp->source_addr = (uint64_t)msg;
310             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
311             msg_tmp->msg = control_msg_tmp;
312             msg_tmp->destNode = destNode;
313             msg_tmp ->size = size;
314         }
315         status = GNI_PostFma(ep_hndl_array[destNode], &pd);
316         if(status == GNI_RC_SUCCESS)
317             return 1;
318         else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
319         {
320             /* store into buffer fma_list and send later */
321             buffered_fma_head = msg_tmp;
322             buffered_fma_tail = msg_tmp;
323             return 0;
324         }
325     }
326 }
327 static int send_with_smsg(int destNode, int size, char *msg)
328 {
329     gni_return_t status;
330     MSG_LIST *msg_tmp;
331     CONTROL_MSG *control_msg_tmp;
332     uint8_t             tag_data = DATA_TAG;
333     uint8_t             tag_control= LMSG_INIT_TAG ;
334
335     /* No mailbox available */
336     if(buffered_smsg_head != 0)
337     {
338         msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
339         msg_tmp->msg = msg;
340         msg_tmp->destNode = destNode;
341         msg_tmp ->size = size;
342         buffered_smsg_tail->next = msg_tmp;
343         buffered_smsg_tail = msg_tmp;
344         return 0;
345     }else
346     {
347         /* Can use SMSGSend */
348         if(size < SMSG_PER_MSG)
349         {
350             /* send the msg itself */
351             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
352             msg_tmp->msg = msg;
353             msg_tmp->destNode = destNode;
354             msg_tmp ->size = size;
355             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], &(msg_tmp->size), (uint32_t)sizeof(int), msg, (uint32_t)size, 0, tag_data);
356         }else
357         {
358             /* construct a control message and send */
359             control_msg_tmp = (CONTROL_MSG *)LrtsAlloc(sizeof(CONTROL_MSG));
360             
361             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
362                 size, remote_bte_cq_hndl,
363                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
364                 -1, &(control_msg_tmp->source_mem_hndl));
365             
366             control_msg_tmp->source = _Cmi_mynode;
367             control_msg_tmp->source_addr = (uint64_t)msg;
368         
369             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
370             msg_tmp->msg = control_msg_tmp;
371             msg_tmp->destNode = destNode;
372             msg_tmp ->size = size;
373             
374             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
375         }
376         if(status == GNI_RC_SUCCESS)
377         {
378             return 1;
379         }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
380         {
381             /* store into buffer smsg_list and send later */
382             buffered_smsg_head = msg_tmp;
383             buffered_smsg_tail = msg_tmp;
384             return 0;
385         }
386         else
387             GNI_RC_CHECK("GNI_SmsgSendWTag", status);
388     }
389 }
390
391 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
392 {
393     PRINT_INFO("Calling LrtsSend")
394     if(useSMSG)
395     {
396         send_with_smsg(destNode, size, msg); 
397     }else
398     {
399         send_with_fma(destNode, size, msg); 
400     }
401     return 0;
402 }
403
404 static void LrtsPreCommonInit(int everReturn){}
405 static void LrtsPostCommonInit(int everReturn){}
406 static void LrtsDrainResources() /* used when exit */
407 {}
408
409 void LrtsPostNonLocal(){}
410 /* pooling CQ to receive network message */
411 static void PumpMsgs()
412 {
413     void *header;
414     uint8_t             tag_data;
415     uint8_t             tag_control;
416     gni_return_t status;
417     uint64_t source_data, source_control, type, inst_id, data;
418     gni_cq_entry_t event_data;
419     int msg_nbytes;
420     void *msg_data;
421     CONTROL_MSG *request_msg;
422     gni_post_descriptor_t pd;
423
424     status = GNI_CqGetEvent(rx_cqh, &event_data);
425
426     if(status == GNI_RC_SUCCESS)
427     {
428         type = GNI_CQ_GET_TYPE(event_data);
429         inst_id = GNI_CQ_GET_INST_ID(event_data);
430     
431     }else
432         return;
433     /* Check local queue about RDMA_GET */
434     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &tag_data)) == GNI_RC_SUCCESS)
435     {
436         /* copy msg out and then put into queue */
437         // memcpy(&msg_nbytes, header, sizeof(int));   
438         msg_nbytes = *(int*)header;
439         msg_data = CmiAlloc(msg_nbytes);
440         memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
441         handleOneRecvedMsg(msg_nbytes, msg_data);
442         GNI_SmsgRelease(ep_hndl_array[inst_id]);
443     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &tag_control)) == GNI_RC_SUCCESS)
444     {
445         /* initial a get to transfer data from the sender side */
446         request_msg = (CONTROL_MSG *) header;
447         msg_data = CmiAlloc(request_msg->length); //need align checking
448         /* register this memory */
449         if(request_msg->length <= FMA_BTE_THRESHOLD) 
450             pd.type            = GNI_POST_FMA_GET;
451         else
452             pd.type            = GNI_POST_RDMA_GET;
453
454         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT |            GNI_CQMODE_REMOTE_EVENT;
455         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
456         pd.length          = request_msg->length;
457         pd.local_addr      = (uint64_t) request_msg;
458         pd.remote_addr     = request_msg->source_addr;
459         pd.remote_mem_hndl = request_msg->source_mem_hndl;
460      // rdma specific
461         pd.src_cq_hndl     = tx_cqh;
462         pd.rdma_mode       = 0;
463
464         if(pd.type == GNI_POST_RDMA_PUT) 
465             status = GNI_PostRdma(ep_hndl_array[request_msg->source], &pd);
466         else
467             status = GNI_PostFma(ep_hndl_array[request_msg->source], &pd);
468
469         if(status = GNI_RC_SUCCESS)
470         {
471             /* put into receive buffer queue */
472         }else
473         {
474         }
475
476         GNI_SmsgRelease(ep_hndl_array[inst_id]);
477     }
478
479 }
480
481 /* Check whether message send or get is confirmed by remote */
482 static void ReleaseSentMessages()
483 {
484     gni_cq_entry_t ev;
485     gni_return_t status;
486     uint64_t type, source, inst_id, data_addr;
487     gni_post_descriptor_t *tmp_pd;
488     MSG_LIST  *ptr;
489     status = GNI_CqGetEvent(tx_cqh, &ev);
490     if(status == GNI_RC_SUCCESS)
491     {
492         type        = GNI_CQ_GET_TYPE(ev);
493         inst_id     = GNI_CQ_GET_INST_ID(ev);
494         data_addr   = GNI_CQ_GET_DATA(ev);
495     }else
496         return;
497
498     if(type == GNI_CQ_EVENT_TYPE_POST)
499     {
500         status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
501     }
502     /* memory leak here , need to realease struct MSG_list */ 
503     CmiFree((void *)data_addr);
504 }
505
506 static void SendBufferMsg()
507 {
508     int ret;
509     while(buffered_smsg_head != 0)
510     {
511         if(useSMSG)
512         {
513             ret = send_with_smsg(buffered_smsg_head->destNode, buffered_smsg_head->size, buffered_smsg_head->msg); 
514         }else
515         {
516             ret = send_with_fma(buffered_smsg_head->destNode, buffered_smsg_head->size, buffered_smsg_head->msg); 
517         }
518         if(ret == GNI_RC_SUCCESS) 
519         {
520             buffered_smsg_head = buffered_smsg_head->next;
521         }else
522             break;
523     }
524 }
525 static void LrtsAdvanceCommunication()
526 {
527     /*  Receive Msg first */
528
529     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
530     PumpMsgs();
531     /* Release Sent Msg */
532     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
533     ReleaseSentMessages();
534     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
535     /* Send buffered Message */
536     SendBufferMsg();
537 }
538
539 void remoteEventHandle(gni_cq_entry_t *event_data, void *context)
540 {
541     gni_return_t status, source_data, source_control;
542     uint64_t            source;
543     void                *header;
544     uint8_t             tag_data;
545     uint8_t             tag_control;
546
547     tag_data = DATA_TAG;
548     tag_control = LMSG_INIT_TAG;
549     /* pool the CQ to check which smsg endpoint to get the data */
550     //status = GNI_CqGetEvent(remote_cq_hndl, &event_data);
551         
552     /* check whether it is data or control information */
553     source = GNI_CQ_GET_SOURCE(*event_data);
554
555     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_data)) == GNI_RC_SUCCESS)
556     {
557         /* copy msg out and then put into queue */
558
559     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_control)) == GNI_RC_SUCCESS)
560     {
561         /* initial a get to transfer data from the sender side */
562     } else
563     {
564
565     }
566
567 }
568
569 /* CDM initialization, register cache, two CQs created, DMA buffer, free list of transaction management structure.  
570  * SMSG - all endpoint pairs created, buffer allocated, connection built (static)
571  * MSQ */
572 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
573 {
574     register int          i;
575     int                   rc;
576     int                   device_id = 0;
577     int                   events_returned;
578     int                   test_id;
579     unsigned int          local_addr;
580     unsigned int          remote_addr;
581     gni_cdm_handle_t      cdm_hndl;
582     gni_nic_handle_t      nic_hndl;
583     gni_return_t          status = GNI_RC_SUCCESS;
584     uint32_t              vmdh_index = -1;
585     uint64_t              *recv_buffer;
586     uint8_t                  ptag;
587     int first_spawned;
588     int                      size, rank;
589
590     void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
591     void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
592     void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
593     
594     //useDynamicSmsg = CmiGetArgFlag(argv, "+useDynamicSmsg");
595     //useMsgq = CmiGetArgFlag(argv, "+useMsgq");
596
597
598     MPI_Init(argc, argv);
599     MPI_Comm_rank(MPI_COMM_WORLD,myNodeID);
600     MPI_Comm_size(MPI_COMM_WORLD,numNodes);
601
602
603     size = *numNodes;
604     rank = *myNodeID;
605     
606     ptag = get_ptag();
607     cookie = get_cookie();
608     
609     /* Create and attach to the communication  domain */
610
611     status = GNI_CdmCreate(rank, ptag, cookie, modes, &cdm_hndl);
612     GNI_RC_CHECK("GNI_CdmCreate", status);
613     /* device id The device id is the minor number for the device that is assigned to the device by the system when the device is created. To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX, where X is the device number
614     0 default */
615     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
616     GNI_RC_CHECK("GNI_CdmAttach", status);
617    
618     /* create the local completion queue */
619     /* adaptive control TODO more option */
620     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
621     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &tx_cqh);
622     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, &local_event_handler, NULL, &tx_cqh);
623     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
624     
625     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
626
627     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
628     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
629
630     GNI_RC_CHECK("Create CQ (rx)", status);
631
632     /* create the endpoints. they need to be bound to allow later CQWrites to them */
633     ep_hndl_array = (gni_ep_handle_t*)malloc(size * sizeof(gni_ep_handle_t));
634     CmiAssert(ep_hndl_array != NULL);
635
636     MPID_UGNI_AllAddr = (unsigned int *)gather_nic_addresses();
637     /* include self */
638     for (i=0; i<size; i++) {
639         if(i == rank) continue;
640         status = GNI_EpCreate(nic_hndl, tx_cqh, &ep_hndl_array[i]);
641         GNI_RC_CHECK("GNI_EpCreate ", status);   
642         remote_addr = MPID_UGNI_AllAddr[i];
643         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
644         
645         GNI_RC_CHECK("GNI_EpBind ", status);   
646     }
647     /* Allocate a dma buffer to hook the destination cq to */
648     
649     fma_buffer = (char *)calloc(FMA_PER_CORE*size, 1);
650     CmiAssert(fma_buffer != NULL);
651
652     status = GNI_MemRegister(nic_hndl, (uint64_t)fma_buffer,
653         FMA_PER_CORE*size, rx_cqh, 
654         GNI_MEM_READWRITE | GNI_MEM_USE_GART, vmdh_index,
655         &fma_buffer_mdh_addr);
656
657     GNI_RC_CHECK("Memregister DMA ", status);
658     /* Gather up all of the mdh's over the socket network, 
659      * this also serves as a barrier */
660     fma_buffer_mdh_addr_base = (mdh_addr_t*)malloc(size* sizeof(mdh_addr_t));
661     CmiAssert(fma_buffer_mdh_addr_base);
662
663     my_mdh_addr.addr = (uint64_t)recv_buffer;
664     my_mdh_addr.mdh = fma_buffer_mdh_addr;
665
666     MPI_Allgather(&my_mdh_addr, sizeof(mdh_addr_t), MPI_BYTE, fma_buffer_mdh_addr_base, sizeof(mdh_addr_t), MPI_BYTE, MPI_COMM_WORLD);
667    
668     /*  If use SMSG mode (1) register memory buffer as mailbox (2)gather the buffer on remote peers */
669     if(useMsgq==0 && useSMSG==1 ){
670         PRINT_INFO("Using static SMSG")
671         smsg_memlen = SMSG_BUFFER_SIZE ;
672         smsg_mem_buffer = (char*)calloc(smsg_memlen, 1);
673         status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mem_buffer,
674                                 smsg_memlen, rx_cqh,
675                                 GNI_MEM_READWRITE | GNI_MEM_USE_GART,   
676                                 vmdh_index,
677                                 &my_smsg_mdh_mailbox);
678        
679         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
680
681         smsg_attr = (gni_smsg_attr_t *)malloc(size*sizeof(gni_smsg_attr_t));
682
683         if(!useDynamicSmsg){
684             for(i=0; i<size; i++)
685             {
686                 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX;
687                 smsg_attr[i].msg_buffer = smsg_mem_buffer;
688                 smsg_attr[i].buff_size = smsg_memlen;
689                 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
690                 smsg_attr[i].mbox_offset = smsg_memlen/size*rank;
691                 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
692                 smsg_attr[i].msg_maxsize = SMSG_PER_MSG;
693             }
694             smsg_attr_vec = (gni_smsg_attr_t*)malloc(size * sizeof(gni_smsg_attr_t));
695             CmiAssert(smsg_attr_vec);
696             MPI_Alltoall(smsg_attr, sizeof(gni_smsg_attr_t), MPI_BYTE, smsg_attr_vec, sizeof(gni_smsg_attr_t), MPI_BYTE, MPI_COMM_WORLD);
697             for(i=0; i<size; i++)
698             {
699                 if (rank == i) continue;
700                 /* initialize the smsg channel */
701                 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], 
702                     &smsg_attr_vec[i]);
703
704                 GNI_RC_CHECK("SMSG Init", status);
705             } //end initialization
706         } //end static 
707         /* do nothing if dynamic connection */
708     } //end smsg
709     else {
710
711         /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
712         msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
713         msgq_attrs.smsg_q_sz = 1;
714         msgq_attrs.rcv_pool_sz = 1;
715         msgq_attrs.num_msgq_eps = 2;
716         msgq_attrs.nloc_insts = 8;
717         msgq_attrs.modes = 0;
718         msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
719
720         status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
721         GNI_RC_CHECK("MSGQ Init", status);
722     } //end MSGQ
723
724     PRINT_INFO("Done with LrtsInit")
725 }
726
727 static void* LrtsAlloc(int n_bytes)
728 {
729
730     if(n_bytes <= SMSG_PER_MSG)
731     {
732         return malloc(n_bytes);
733     }else if(n_bytes <= FMA_BTE_THRESHOLD)
734     {
735         return malloc(n_bytes);
736     }else 
737     {
738         return memalign(64, n_bytes);
739     }
740 }
741
742 static void  LrtsFree(void *msg)
743 {
744     free(msg);
745 }
746
747
748 static void LrtsExit()
749 {
750     /* free memory ? */
751     MPI_Finalize();
752 }
753 /***********************************************************************
754  *
755  * Abort function:
756  *
757  ************************************************************************/
758
759 void CmiAbort(const char *message) {
760
761     MPI_Abort(MPI_COMM_WORLD, -1);
762 }
763
764 /**************************  TIMER FUNCTIONS **************************/
765
766 #if CMK_TIMER_USE_SPECIAL
767
768 /* MPI calls are not threadsafe, even the timer on some machines */
769 static CmiNodeLock  timerLock = 0;
770                                 static int _absoluteTime = 0;
771                                                            static double starttimer = 0;
772                                                                                       static int _is_global = 0;
773
774 int CmiTimerIsSynchronized() {
775     int  flag;
776     void *v;
777
778     /*  check if it using synchronized timer */
779     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
780         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
781     if (flag) {
782         _is_global = *(int*)v;
783         if (_is_global && CmiMyPe() == 0)
784             printf("Charm++> MPI timer is synchronized\n");
785     }
786     return _is_global;
787 }
788
789 int CmiTimerAbsolute() {
790     return _absoluteTime;
791 }
792
793 double CmiStartTimer() {
794     return 0.0;
795 }
796
797 double CmiInitTime() {
798     return starttimer;
799 }
800
801 void CmiTimerInit(char **argv) {
802     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
803     if (_absoluteTime && CmiMyPe() == 0)
804         printf("Charm++> absolute MPI timer is used\n");
805
806     _is_global = CmiTimerIsSynchronized();
807
808     if (_is_global) {
809         if (CmiMyRank() == 0) {
810             double minTimer;
811 #if CMK_TIMER_USE_XT3_DCLOCK
812             starttimer = dclock();
813 #else
814             starttimer = MPI_Wtime();
815 #endif
816
817             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
818                           MPI_COMM_WORLD );
819             starttimer = minTimer;
820         }
821     } else { /* we don't have a synchronous timer, set our own start time */
822         CmiBarrier();
823         CmiBarrier();
824         CmiBarrier();
825 #if CMK_TIMER_USE_XT3_DCLOCK
826         starttimer = dclock();
827 #else
828         starttimer = MPI_Wtime();
829 #endif
830     }
831
832 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
833     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
834         timerLock = CmiCreateLock();
835 #endif
836     CmiNodeAllBarrier();          /* for smp */
837 }
838
839 /**
840  * Since the timerLock is never created, and is
841  * always NULL, then all the if-condition inside
842  * the timer functions could be disabled right
843  * now in the case of SMP. --Chao Mei
844  */
845 double CmiTimer(void) {
846     double t;
847 #if 0 && CMK_SMP
848     if (timerLock) CmiLock(timerLock);
849 #endif
850
851 #if CMK_TIMER_USE_XT3_DCLOCK
852     t = dclock();
853 #else
854     t = MPI_Wtime();
855 #endif
856
857 #if 0 && CMK_SMP
858     if (timerLock) CmiUnlock(timerLock);
859 #endif
860
861     return _absoluteTime?t: (t-starttimer);
862 }
863
864 double CmiWallTimer(void) {
865     double t;
866 #if 0 && CMK_SMP
867     if (timerLock) CmiLock(timerLock);
868 #endif
869
870 #if CMK_TIMER_USE_XT3_DCLOCK
871     t = dclock();
872 #else
873     t = MPI_Wtime();
874 #endif
875
876 #if 0 && CMK_SMP
877     if (timerLock) CmiUnlock(timerLock);
878 #endif
879
880     return _absoluteTime? t: (t-starttimer);
881 }
882
883 double CmiCpuTimer(void) {
884     double t;
885 #if 0 && CMK_SMP
886     if (timerLock) CmiLock(timerLock);
887 #endif
888 #if CMK_TIMER_USE_XT3_DCLOCK
889     t = dclock() - starttimer;
890 #else
891     t = MPI_Wtime() - starttimer;
892 #endif
893 #if 0 && CMK_SMP
894     if (timerLock) CmiUnlock(timerLock);
895 #endif
896     return t;
897 }
898
899 #endif
900
901 /************Barrier Related Functions****************/
902
903 int CmiBarrier()
904 {
905     int status;
906     status = MPI_Barrier(MPI_COMM_WORLD);
907     return status;
908
909 }
910 /*@}*/
911
912 /*******************************************
913  *
914  * internal function only to this file 
915  */