typo
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <errno.h>
16 #include <malloc.h>
17 #include "converse.h"
18
19 #include "gni_pub.h"
20 #include "mpi.h"
21 #include "pmi.h"
22 /*Support for ++debug: */
23 #if defined(_WIN32) && ! defined(__CYGWIN__)
24 #include <windows.h>
25 #include <wincon.h>
26 #include <sys/types.h>
27 #include <sys/timeb.h>
28 static void sleep(int secs) {
29     Sleep(1000*secs);
30 }
31 #else
32 #include <unistd.h> /*For getpid()*/
33 #endif
34 #include <stdlib.h> /*For sleep()*/
35
36 #include "machine.h"
37
38 #include "pcqueue.h"
39
40 #define DEBUY_PRINT
41
42 #ifdef DEBUY_PRINT
43 #define PRINT_INFO(msg) {fprintf(stdout, "[%d] %s\n", CmiMyPe(), msg); fflush(stdout);}
44 #else
45 #define PRINT_INFO(msg)
46 #endif
47 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
48 static int useSMSG   = 1;
49 static int useDynamicSmsg = 0;
50 static int useMsgq = 0;
51 static int size, rank;
52 #define FMA_PER_CORE  1024
53 #define FMA_BUFFER_SIZE 1024
54 #define SMSG_PER_MSG    1024
55 #define SMSG_MAX_CREDIT 16
56 #define SMSG_BUFFER_SIZE        102400
57 #define FMA_BTE_THRESHOLD  4096
58 #define MSGQ_MAXSIZE       4096
59
60 #define DEBUG
61
62 #ifdef GNI_RC_CHECK
63 #undef GNI_RC_CHECK
64 #endif
65
66 #ifdef DEBUG
67 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("%s; err=%s\n",msg,gni_err_str[rc]); exit(911); } } while(0)
68 #else
69 #define GNI_RC_CHECK(msg,rc)
70 #endif
71
72 static gni_nic_handle_t      nic_hndl;
73 static unsigned int         *MPID_UGNI_AllAddr;
74 static gni_smsg_attr_t      *smsg_attr;
75 static gni_smsg_attr_t      *smsg_attr_vec;
76 static char                 *smsg_mem_buffer = NULL;
77 static uint32_t             smsg_memlen;
78
79 gni_msgq_attr_t         msgq_attrs;
80 gni_msgq_handle_t       msgq_handle;
81 gni_msgq_ep_attr_t      msgq_ep_attrs;
82 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
83
84 #define REMOTE_QUEUE_ENTRIES  1048576
85 #define LOCAL_QUEUE_ENTRIES 1024
86 /* SMSG is data message */
87 #define DATA_TAG        0
88 /* SMSG is a control message to initialize a BTE */
89 #define LMSG_INIT_TAG        1 
90 /* =====Beginning of Declarations of Machine Specific Variables===== */
91 static int cookie;
92 static int modes = 0;
93 static gni_cq_handle_t       rx_cqh = NULL;
94 static gni_cq_handle_t       remote_bte_cq_hndl = NULL;
95 static gni_cq_handle_t       tx_cqh = NULL;
96 static gni_ep_handle_t       *ep_hndl_array;
97
98 /* preallocated memory buffer for FMA for short message and control message */
99 static char            *fma_buffer;
100 static int              fma_buffer_len_eachcore = FMA_BUFFER_SIZE;
101 typedef struct {
102     gni_mem_handle_t mdh;
103     uint64_t addr;
104 } mdh_addr_t;
105
106 static mdh_addr_t            my_mdh_addr;
107 static mdh_addr_t            *fma_buffer_mdh_addr_base;
108 static gni_mem_handle_t      fma_buffer_mdh_addr;
109 static gni_mem_handle_t      my_smsg_mdh_mailbox;
110 /* =====Beginning of Declarations of Machine Specific Functions===== */
111
112 typedef struct msg_list
113 {
114     int destNode;
115     int size;
116     void *msg;
117     struct msg_list *next;
118
119 }MSG_LIST;
120
121 typedef struct control_msg
122 {
123     int             source;   //source rank
124     uint64_t        source_addr;
125     gni_mem_handle_t    source_mem_hndl;
126     uint64_t            length;
127 }CONTROL_MSG;
128
129 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
130 static MSG_LIST *buffered_smsg_head= 0;
131 static MSG_LIST *buffered_smsg_tail= 0;
132 /* SmsgSend return success but message sent is not confirmed by remote side */
133
134 static MSG_LIST *buffered_fma_head = 0;
135 static MSG_LIST *buffered_fma_tail = 0;
136
137 static unsigned int get_gni_nic_address(int device_id)
138 {
139     unsigned int address, cpu_id;
140     gni_return_t status;
141     int i, alps_dev_id=-1,alps_address=-1;
142     char *token, *p_ptr;
143
144     p_ptr = getenv("PMI_GNI_DEV_ID");
145     if (!p_ptr) {
146         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
147        
148         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
149     } else {
150         while ((token = strtok(p_ptr,":")) != NULL) {
151             alps_dev_id = atoi(token);
152             if (alps_dev_id == device_id) {
153                 break;
154             }
155             p_ptr = NULL;
156         }
157         CmiAssert(alps_dev_id != -1);
158         p_ptr = getenv("PMI_GNI_LOC_ADDR");
159         CmiAssert(p_ptr != NULL);
160         i = 0;
161         while ((token = strtok(p_ptr,":")) != NULL) {
162             if (i == alps_dev_id) {
163                 alps_address = atoi(token);
164                 break;
165             }
166             p_ptr = NULL;
167             ++i;
168         }
169         CmiAssert(alps_address != -1);
170         address = alps_address;
171     }
172     return address;
173 }
174 static void* gather_nic_addresses(void)
175 {
176     unsigned int local_addr,*alladdrs;
177     int size,rc;
178     size_t addr_len;
179
180     MPI_Comm_size(MPI_COMM_WORLD,&size);
181
182     /*
183      * * just assume a single gemini device
184      */
185     local_addr = get_gni_nic_address(0);
186
187     addr_len = sizeof(unsigned int);
188
189     alladdrs = (unsigned int *)malloc(addr_len * size);
190     CmiAssert(alladdrs != NULL);
191
192     MPI_Allgather(&local_addr, addr_len, MPI_BYTE, alladdrs, addr_len, MPI_BYTE, MPI_COMM_WORLD);
193
194     return (void *)alladdrs;
195
196 }
197
198 static uint8_t get_ptag(void)
199 {
200     char *p_ptr, *token;
201     uint8_t ptag;
202
203     p_ptr = getenv("PMI_GNI_PTAG");
204     CmiAssert(p_ptr != NULL);
205     token = strtok(p_ptr, ":");
206     ptag = (uint8_t)atoi(token);
207     return ptag;
208         
209 }
210
211 static uint32_t get_cookie(void)
212 {
213     uint32_t cookie;
214     char *p_ptr, *token;
215
216     p_ptr = getenv("PMI_GNI_COOKIE");
217     CmiAssert(p_ptr != NULL);
218     token = strtok(p_ptr, ":");
219     cookie = (uint32_t)atoi(token);
220
221     return cookie;
222 }
223
224 /*
225  * Local side event handler
226  *
227  */
228
229 void LocalEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
230 {
231
232     int type;
233
234     type = GNI_CQ_GET_TYPE(*cq_entry);
235
236     if(type == GNI_CQ_EVENT_TYPE_SMSG)
237     {
238
239     }
240 }
241
242 void RemoteSmsgEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
243 {
244 }
245
246 void RemoteBteEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
247 {
248 }
249 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
250 /* TODO: add any that are related */
251 /* =====End of Definitions of Message-Corruption Related Macros=====*/
252
253
254 #include "machine-lrts.h"
255 #include "machine-common.c"
256
257 /* Network progress function is used to poll the network when for
258    messages. This flushes receive buffers on some  implementations*/
259 #if CMK_MACHINE_PROGRESS_DEFINED
260 void CmiMachineProgressImpl() {
261 }
262 #endif
263
264
265 /* 
266  * The message can be copied to registered memory buffer and be sent
267  * This message memory can be registered to network. It depends on which one is cheaper
268  *
269  * register might be better when the msg is large
270  */
271
272 static int send_with_fma(int destNode, int size, char *msg)
273 {
274     gni_post_descriptor_t   pd;
275     gni_return_t status;
276     CONTROL_MSG *control_msg_tmp;
277     MSG_LIST *msg_tmp;
278     if(buffered_fma_head != 0)
279     {
280         msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
281         msg_tmp->msg = msg;
282         msg_tmp->destNode = destNode;
283         msg_tmp ->size = size;
284         buffered_smsg_tail->next = msg_tmp;
285         buffered_smsg_tail = msg_tmp;
286         return 0;
287     } else
288     {
289         pd.type            = GNI_POST_FMA_PUT;
290         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
291         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
292         pd.length          = 8;
293         pd.remote_addr     = fma_buffer_mdh_addr_base[destNode].addr + fma_buffer_len_eachcore*destNode;
294         pd.remote_mem_hndl = fma_buffer_mdh_addr_base[destNode].mdh;
295         if(size < FMA_BUFFER_SIZE)
296         {
297             /* send the message */
298             pd.local_addr      = (uint64_t) msg;
299         }else
300         {
301             /* construct a control message and send */
302             control_msg_tmp = (CONTROL_MSG *)LrtsAlloc((int)sizeof(CONTROL_MSG));
303             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
304                 size, remote_bte_cq_hndl,
305                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
306                 -1, &(control_msg_tmp->source_mem_hndl));
307             
308             control_msg_tmp->source = _Cmi_mynode;
309             control_msg_tmp->source_addr = (uint64_t)msg;
310         }
311         status = GNI_PostFma(ep_hndl_array[destNode], &pd);
312         if(status == GNI_RC_SUCCESS)
313             return 1;
314         else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
315         {
316             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
317             msg_tmp->msg = control_msg_tmp;
318             msg_tmp->destNode = destNode;
319             msg_tmp ->size = size;
320             /* store into buffer fma_list and send later */
321             buffered_fma_head = msg_tmp;
322             buffered_fma_tail = msg_tmp;
323             return 0;
324         }
325     }
326 }
327 static int send_with_smsg(int destNode, int size, char *msg)
328 {
329     gni_return_t status;
330     MSG_LIST *msg_tmp;
331     CONTROL_MSG *control_msg_tmp;
332     uint8_t             tag_data = DATA_TAG;
333     uint8_t             tag_control= LMSG_INIT_TAG ;
334
335     /* No mailbox available */
336     if(buffered_smsg_head != 0)
337     {
338         msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
339         msg_tmp->msg = msg;
340         msg_tmp->destNode = destNode;
341         msg_tmp ->size = size;
342         buffered_smsg_tail->next = msg_tmp;
343         buffered_smsg_tail = msg_tmp;
344         return 0;
345     }else
346     {
347         /* Can use SMSGSend */
348         if(size < SMSG_PER_MSG)
349         {
350             /* send the msg itself */
351             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
352             msg_tmp->msg = msg;
353             msg_tmp->destNode = destNode;
354             msg_tmp ->size = size;
355             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], &(msg_tmp->size), (uint32_t)sizeof(int), msg, (uint32_t)size, 0, tag_data);
356         }else
357         {
358             /* construct a control message and send */
359             control_msg_tmp = (CONTROL_MSG *)LrtsAlloc(sizeof(CONTROL_MSG));
360             
361             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
362                 size, remote_bte_cq_hndl,
363                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
364                 -1, &(control_msg_tmp->source_mem_hndl));
365             
366             control_msg_tmp->source = _Cmi_mynode;
367             control_msg_tmp->source_addr = (uint64_t)msg;
368             
369             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
370         }
371         if(status == GNI_RC_SUCCESS)
372         {
373             return 1;
374         }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
375         {
376             msg_tmp = (MSG_LIST *)LrtsAlloc(sizeof(MSG_LIST));
377             msg_tmp->msg = control_msg_tmp;
378             msg_tmp->destNode = destNode;
379             msg_tmp ->size = size;
380             /* store into buffer smsg_list and send later */
381             buffered_smsg_head = msg_tmp;
382             buffered_smsg_tail = msg_tmp;
383             return 0;
384         }
385         else
386             GNI_RC_CHECK("GNI_SmsgSendWTag", status);
387     }
388 }
389
390 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
391 {
392     PRINT_INFO("Calling LrtsSend")
393     if(useSMSG)
394     {
395         send_with_smsg(destNode, size, msg); 
396     }else
397     {
398         send_with_fma(destNode, size, msg); 
399     }
400     return 0;
401 }
402
403 static void LrtsPreCommonInit(int everReturn){}
404 static void LrtsPostCommonInit(int everReturn){}
405 static void LrtsDrainResources() /* used when exit */
406 {}
407
408 void LrtsPostNonLocal(){}
409 /* pooling CQ to receive network message */
410 static void PumpMsgs()
411 {
412     void *header;
413     uint8_t             tag_data;
414     uint8_t             tag_control;
415     gni_return_t status;
416     uint64_t source_data, source_control, type, inst_id, data;
417     gni_cq_entry_t event_data;
418     int msg_nbytes;
419     void *msg_data;
420     CONTROL_MSG *request_msg;
421     gni_post_descriptor_t pd;
422
423     status = GNI_CqGetEvent(rx_cqh, &event_data);
424
425     if(status == GNI_RC_SUCCESS)
426     {
427         type = GNI_CQ_GET_TYPE(event_data);
428         inst_id = GNI_CQ_GET_INST_ID(event_data);
429     
430     }else
431         return;
432     /* Check local queue about RDMA_GET */
433     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &tag_data)) == GNI_RC_SUCCESS)
434     {
435         /* copy msg out and then put into queue */
436         // memcpy(&msg_nbytes, header, sizeof(int));   
437         msg_nbytes = *(int*)header;
438         msg_data = CmiAlloc(msg_nbytes);
439         memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
440         handleOneRecvedMsg(msg_nbytes, msg_data);
441         GNI_SmsgRelease(ep_hndl_array[inst_id]);
442     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &tag_control)) == GNI_RC_SUCCESS)
443     {
444         /* initial a get to transfer data from the sender side */
445         request_msg = (CONTROL_MSG *) header;
446         msg_data = CmiAlloc(request_msg->length); //need align checking
447         /* register this memory */
448         if(request_msg->length <= FMA_BTE_THRESHOLD) 
449             pd.type            = GNI_POST_FMA_GET;
450         else
451             pd.type            = GNI_POST_RDMA_GET;
452
453         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT |            GNI_CQMODE_REMOTE_EVENT;
454         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
455         pd.length          = request_msg->length;
456         pd.local_addr      = (uint64_t) request_msg;
457         pd.remote_addr     = request_msg->source_addr;
458         pd.remote_mem_hndl = request_msg->source_mem_hndl;
459      // rdma specific
460         pd.src_cq_hndl     = tx_cqh;
461         pd.rdma_mode       = 0;
462
463         if(pd.type == GNI_POST_RDMA_PUT) 
464             status = GNI_PostRdma(ep_hndl_array[request_msg->source], &pd);
465         else
466             status = GNI_PostFma(ep_hndl_array[request_msg->source], &pd);
467
468         if(status = GNI_RC_SUCCESS)
469         {
470             /* put into receive buffer queue */
471         }else
472         {
473         }
474
475         GNI_SmsgRelease(ep_hndl_array[inst_id]);
476     }
477
478 }
479
480 /* Check whether message send or get is confirmed by remote */
481 static void ReleaseSentMessages()
482 {
483     gni_cq_entry_t ev;
484     gni_return_t status;
485     uint64_t type, source, inst_id, data_addr;
486     gni_post_descriptor_t *tmp_pd;
487     MSG_LIST  *ptr;
488     status = GNI_CqGetEvent(tx_cqh, &ev);
489     if(status == GNI_RC_SUCCESS)
490     {
491         type        = GNI_CQ_GET_TYPE(ev);
492         inst_id     = GNI_CQ_GET_INST_ID(ev);
493         data_addr   = GNI_CQ_GET_DATA(ev);
494     }else
495         return;
496
497     if(type == GNI_CQ_EVENT_TYPE_POST)
498     {
499         status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
500     }
501     /* memory leak here , need to realease struct MSG_list */ 
502     CmiFree((void *)data_addr);
503 }
504
505 static void SendBufferMsg()
506 {
507     int ret;
508     while(buffered_smsg_head != 0)
509     {
510         if(useSMSG)
511         {
512             ret = send_with_smsg(buffered_smsg_head->destNode, buffered_smsg_head->size, buffered_smsg_head->msg); 
513         }else
514         {
515             ret = send_with_fma(buffered_smsg_head->destNode, buffered_smsg_head->size, buffered_smsg_head->msg); 
516         }
517         if(ret == GNI_RC_SUCCESS) 
518         {
519             buffered_smsg_head = buffered_smsg_head->next;
520         }else
521             break;
522     }
523 }
524 static void LrtsAdvanceCommunication()
525 {
526     /*  Receive Msg first */
527
528     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
529     PumpMsgs();
530     /* Release Sent Msg */
531     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
532     ReleaseSentMessages();
533     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
534     /* Send buffered Message */
535     SendBufferMsg();
536 }
537
538 void remoteEventHandle(gni_cq_entry_t *event_data, void *context)
539 {
540     gni_return_t status, source_data, source_control;
541     uint64_t            source;
542     void                *header;
543     uint8_t             tag_data;
544     uint8_t             tag_control;
545
546     tag_data = DATA_TAG;
547     tag_control = LMSG_INIT_TAG;
548     /* pool the CQ to check which smsg endpoint to get the data */
549     //status = GNI_CqGetEvent(remote_cq_hndl, &event_data);
550         
551     /* check whether it is data or control information */
552     source = GNI_CQ_GET_SOURCE(*event_data);
553
554     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_data)) == GNI_RC_SUCCESS)
555     {
556         /* copy msg out and then put into queue */
557
558     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_control)) == GNI_RC_SUCCESS)
559     {
560         /* initial a get to transfer data from the sender side */
561     } else
562     {
563
564     }
565
566 }
567
568 /* CDM initialization, register cache, two CQs created, DMA buffer, free list of transaction management structure.  
569  * SMSG - all endpoint pairs created, buffer allocated, connection built (static)
570  * MSQ */
571 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
572 {
573     register int          i;
574     int                   rc;
575     int                   device_id = 0;
576     int                   events_returned;
577     int                   test_id;
578     unsigned int          local_addr;
579     unsigned int          remote_addr;
580     gni_cdm_handle_t      cdm_hndl;
581     gni_nic_handle_t      nic_hndl;
582     gni_return_t          status = GNI_RC_SUCCESS;
583     uint32_t              vmdh_index = -1;
584     uint64_t              *recv_buffer;
585     uint8_t                  ptag;
586     int first_spawned;
587     int                      size, rank;
588
589     void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
590     void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
591     void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
592     
593     //useDynamicSmsg = CmiGetArgFlag(argv, "+useDynamicSmsg");
594     //useMsgq = CmiGetArgFlag(argv, "+useMsgq");
595
596
597     MPI_Init(argc, argv);
598     MPI_Comm_rank(MPI_COMM_WORLD,myNodeID);
599     MPI_Comm_size(MPI_COMM_WORLD,numNodes);
600
601
602     size = *numNodes;
603     rank = *myNodeID;
604     
605     ptag = get_ptag();
606     cookie = get_cookie();
607     
608     /* Create and attach to the communication  domain */
609
610     status = GNI_CdmCreate(rank, ptag, cookie, modes, &cdm_hndl);
611     GNI_RC_CHECK("GNI_CdmCreate", status);
612     /* device id The device id is the minor number for the device that is assigned to the device by the system when the device is created. To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX, where X is the device number
613     0 default */
614     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
615     GNI_RC_CHECK("GNI_CdmAttach", status);
616    
617     /* create the local completion queue */
618     /* adaptive control TODO more option */
619     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
620     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &tx_cqh);
621     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, &local_event_handler, NULL, &tx_cqh);
622     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
623     
624     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
625
626     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
627     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
628
629     GNI_RC_CHECK("Create CQ (rx)", status);
630
631     /* create the endpoints. they need to be bound to allow later CQWrites to them */
632     ep_hndl_array = (gni_ep_handle_t*)malloc(size * sizeof(gni_ep_handle_t));
633     CmiAssert(ep_hndl_array != NULL);
634
635     MPID_UGNI_AllAddr = (unsigned int *)gather_nic_addresses();
636     /* include self */
637     for (i=0; i<size; i++) {
638         if(i == rank) continue;
639         status = GNI_EpCreate(nic_hndl, tx_cqh, &ep_hndl_array[i]);
640         GNI_RC_CHECK("GNI_EpCreate ", status);   
641         remote_addr = MPID_UGNI_AllAddr[i];
642         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
643         
644         GNI_RC_CHECK("GNI_EpBind ", status);   
645     }
646     /* Allocate a dma buffer to hook the destination cq to */
647     
648     fma_buffer = (char *)calloc(FMA_PER_CORE*size, 1);
649     CmiAssert(fma_buffer != NULL);
650
651     status = GNI_MemRegister(nic_hndl, (uint64_t)fma_buffer,
652         FMA_PER_CORE*size, rx_cqh, 
653         GNI_MEM_READWRITE | GNI_MEM_USE_GART, vmdh_index,
654         &fma_buffer_mdh_addr);
655
656     GNI_RC_CHECK("Memregister DMA ", status);
657     /* Gather up all of the mdh's over the socket network, 
658      * this also serves as a barrier */
659     fma_buffer_mdh_addr_base = (mdh_addr_t*)malloc(size* sizeof(mdh_addr_t));
660     CmiAssert(fma_buffer_mdh_addr_base);
661
662     my_mdh_addr.addr = (uint64_t)recv_buffer;
663     my_mdh_addr.mdh = fma_buffer_mdh_addr;
664
665     MPI_Allgather(&my_mdh_addr, sizeof(mdh_addr_t), MPI_BYTE, fma_buffer_mdh_addr_base, sizeof(mdh_addr_t), MPI_BYTE, MPI_COMM_WORLD);
666    
667     /*  If use SMSG mode (1) register memory buffer as mailbox (2)gather the buffer on remote peers */
668     if(useMsgq==0 && useSMSG==1 ){
669         PRINT_INFO("Using static SMSG")
670         smsg_memlen = SMSG_BUFFER_SIZE ;
671         smsg_mem_buffer = (char*)calloc(smsg_memlen, 1);
672         status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mem_buffer,
673                                 smsg_memlen, rx_cqh,
674                                 GNI_MEM_READWRITE | GNI_MEM_USE_GART,   
675                                 vmdh_index,
676                                 &my_smsg_mdh_mailbox);
677        
678         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
679
680         smsg_attr = (gni_smsg_attr_t *)malloc(size*sizeof(gni_smsg_attr_t));
681
682         if(!useDynamicSmsg){
683             for(i=0; i<size; i++)
684             {
685                 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX;
686                 smsg_attr[i].msg_buffer = smsg_mem_buffer;
687                 smsg_attr[i].buff_size = smsg_memlen;
688                 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
689                 smsg_attr[i].mbox_offset = smsg_memlen/size*rank;
690                 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
691                 smsg_attr[i].msg_maxsize = SMSG_PER_MSG;
692             }
693             smsg_attr_vec = (gni_smsg_attr_t*)malloc(size * sizeof(gni_smsg_attr_t));
694             CmiAssert(smsg_attr_vec);
695             MPI_Alltoall(smsg_attr, sizeof(gni_smsg_attr_t), MPI_BYTE, smsg_attr_vec, sizeof(gni_smsg_attr_t), MPI_BYTE, MPI_COMM_WORLD);
696             for(i=0; i<size; i++)
697             {
698                 if (rank == i) continue;
699                 /* initialize the smsg channel */
700                 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], 
701                     &smsg_attr_vec[i]);
702
703                 GNI_RC_CHECK("SMSG Init", status);
704             } //end initialization
705         } //end static 
706         /* do nothing if dynamic connection */
707     } //end smsg
708     else {
709
710         /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
711         msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
712         msgq_attrs.smsg_q_sz = 1;
713         msgq_attrs.rcv_pool_sz = 1;
714         msgq_attrs.num_msgq_eps = 2;
715         msgq_attrs.nloc_insts = 8;
716         msgq_attrs.modes = 0;
717         msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
718
719         status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
720         GNI_RC_CHECK("MSGQ Init", status);
721     } //end MSGQ
722
723     PRINT_INFO("Done with LrtsInit")
724 }
725
726 static void* LrtsAlloc(int n_bytes)
727 {
728
729     if(n_bytes <= SMSG_PER_MSG)
730     {
731         return malloc(n_bytes);
732     }else if(n_bytes <= FMA_BTE_THRESHOLD)
733     {
734         return malloc(n_bytes);
735     }else 
736     {
737         return memalign(64, n_bytes);
738     }
739 }
740
741 static void  LrtsFree(void *msg)
742 {
743     free(msg);
744 }
745
746
747 static void LrtsExit()
748 {
749     /* free memory ? */
750     MPI_Finalize();
751 }
752 /***********************************************************************
753  *
754  * Abort function:
755  *
756  ************************************************************************/
757
758 void CmiAbort(const char *message) {
759
760     MPI_Abort(MPI_COMM_WORLD, -1);
761 }
762
763 /**************************  TIMER FUNCTIONS **************************/
764
765 #if CMK_TIMER_USE_SPECIAL
766
767 /* MPI calls are not threadsafe, even the timer on some machines */
768 static CmiNodeLock  timerLock = 0;
769                                 static int _absoluteTime = 0;
770                                                            static double starttimer = 0;
771                                                                                       static int _is_global = 0;
772
773 int CmiTimerIsSynchronized() {
774     int  flag;
775     void *v;
776
777     /*  check if it using synchronized timer */
778     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
779         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
780     if (flag) {
781         _is_global = *(int*)v;
782         if (_is_global && CmiMyPe() == 0)
783             printf("Charm++> MPI timer is synchronized\n");
784     }
785     return _is_global;
786 }
787
788 int CmiTimerAbsolute() {
789     return _absoluteTime;
790 }
791
792 double CmiStartTimer() {
793     return 0.0;
794 }
795
796 double CmiInitTime() {
797     return starttimer;
798 }
799
800 void CmiTimerInit(char **argv) {
801     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
802     if (_absoluteTime && CmiMyPe() == 0)
803         printf("Charm++> absolute MPI timer is used\n");
804
805     _is_global = CmiTimerIsSynchronized();
806
807     if (_is_global) {
808         if (CmiMyRank() == 0) {
809             double minTimer;
810 #if CMK_TIMER_USE_XT3_DCLOCK
811             starttimer = dclock();
812 #else
813             starttimer = MPI_Wtime();
814 #endif
815
816             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
817                           MPI_COMM_WORLD );
818             starttimer = minTimer;
819         }
820     } else { /* we don't have a synchronous timer, set our own start time */
821         CmiBarrier();
822         CmiBarrier();
823         CmiBarrier();
824 #if CMK_TIMER_USE_XT3_DCLOCK
825         starttimer = dclock();
826 #else
827         starttimer = MPI_Wtime();
828 #endif
829     }
830
831 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
832     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
833         timerLock = CmiCreateLock();
834 #endif
835     CmiNodeAllBarrier();          /* for smp */
836 }
837
838 /**
839  * Since the timerLock is never created, and is
840  * always NULL, then all the if-condition inside
841  * the timer functions could be disabled right
842  * now in the case of SMP. --Chao Mei
843  */
844 double CmiTimer(void) {
845     double t;
846 #if 0 && CMK_SMP
847     if (timerLock) CmiLock(timerLock);
848 #endif
849
850 #if CMK_TIMER_USE_XT3_DCLOCK
851     t = dclock();
852 #else
853     t = MPI_Wtime();
854 #endif
855
856 #if 0 && CMK_SMP
857     if (timerLock) CmiUnlock(timerLock);
858 #endif
859
860     return _absoluteTime?t: (t-starttimer);
861 }
862
863 double CmiWallTimer(void) {
864     double t;
865 #if 0 && CMK_SMP
866     if (timerLock) CmiLock(timerLock);
867 #endif
868
869 #if CMK_TIMER_USE_XT3_DCLOCK
870     t = dclock();
871 #else
872     t = MPI_Wtime();
873 #endif
874
875 #if 0 && CMK_SMP
876     if (timerLock) CmiUnlock(timerLock);
877 #endif
878
879     return _absoluteTime? t: (t-starttimer);
880 }
881
882 double CmiCpuTimer(void) {
883     double t;
884 #if 0 && CMK_SMP
885     if (timerLock) CmiLock(timerLock);
886 #endif
887 #if CMK_TIMER_USE_XT3_DCLOCK
888     t = dclock() - starttimer;
889 #else
890     t = MPI_Wtime() - starttimer;
891 #endif
892 #if 0 && CMK_SMP
893     if (timerLock) CmiUnlock(timerLock);
894 #endif
895     return t;
896 }
897
898 #endif
899
900 /************Barrier Related Functions****************/
901
902 int CmiBarrier()
903 {
904     int status;
905     status = MPI_Barrier(MPI_COMM_WORLD);
906     return status;
907
908 }
909 /*@}*/
910
911 /*******************************************
912  *
913  * internal function only to this file 
914  */