minor cleanup
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <stdint.h>
16 #include <errno.h>
17 #include <malloc.h>
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #if defined(_WIN32) && ! defined(__CYGWIN__)
26 #include <windows.h>
27 #include <wincon.h>
28 #include <sys/types.h>
29 #include <sys/timeb.h>
30
31 static void sleep(int secs) {
32     Sleep(1000*secs);
33 }
34 #else
35 #include <unistd.h> /*For getpid()*/
36 #endif
37
38 #define PRINT_SYH  0
39
40 #if PRINT_SYH
41 int         lrts_send_request = 0;
42 int         lrts_received_msg = 0;
43 int         lrts_local_done_msg = 0;
44 #endif
45
46 #include "machine.h"
47
48 #include "pcqueue.h"
49
50 //#define  USE_ONESIDED 1
51 #ifdef USE_ONESIDED
52 //onesided implementation is wrong, since no place to restore omdh
53 #include "onesided.h"
54 onesided_hnd_t   onesided_hnd;
55 onesided_md_t    omdh;
56 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
57
58 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
59
60 #else
61 uint8_t   onesided_hnd, omdh;
62 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE|GNI_MEM_USE_GART, -1, mem_hndl)
63
64 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh)  GNI_MemDeregister(nic_hndl, (mem_hndl))
65 #endif
66
67 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
68 #define CmiSetMsgSize(m,s)  do {((((CmiMsgHeaderExt*)m)->size)=(s));} while(0)
69
70 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
71 /* If SMSG is not used */
72 #define FMA_PER_CORE  1024
73 #define FMA_BUFFER_SIZE 1024
74 /* If SMSG is used */
75 #define SMSG_MAX_MSG     1024
76 #define SMSG_MAX_CREDIT  128
77
78 #define MSGQ_MAXSIZE       4096
79 /* large message transfer with FMA or BTE */
80 #define LRTS_GNI_RDMA_THRESHOLD  16384
81
82 #define REMOTE_QUEUE_ENTRIES  1048576
83 #define LOCAL_QUEUE_ENTRIES   10240
84
85 /* SMSG is data message */
86 #define DATA_TAG          0x38
87 /* SMSG is a control message to initialize a BTE */
88 #define LMSG_INIT_TAG     0x39 
89 #define ACK_TAG           0x37
90
91 #define DEBUG
92 #ifdef GNI_RC_CHECK
93 #undef GNI_RC_CHECK
94 #endif
95 #ifdef DEBUG
96 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
97 #else
98 #define GNI_RC_CHECK(msg,rc)
99 #endif
100
101 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
102
103 static int useStaticSMSG   = 1;
104 static int useStaticMSGQ = 0;
105 static int useStaticFMA = 0;
106 static int mysize, myrank;
107 static gni_nic_handle_t      nic_hndl;
108
109
110 static void             *smsg_mailbox_base;
111 gni_msgq_attr_t         msgq_attrs;
112 gni_msgq_handle_t       msgq_handle;
113 gni_msgq_ep_attr_t      msgq_ep_attrs;
114 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
115 /* =====Beginning of Declarations of Machine Specific Variables===== */
116 static int cookie;
117 static int modes = 0;
118 static gni_cq_handle_t       rx_cqh = NULL;
119 static gni_cq_handle_t       rdma_cqh = NULL;
120 static gni_cq_handle_t       tx_cqh = NULL;
121 static gni_ep_handle_t       *ep_hndl_array;
122
123 typedef    struct  pending_smg
124 {
125     int     inst_id;
126     struct  pending_smg *next;
127 } PENDING_GETNEXT;
128
129
130 /* preallocated memory buffer for FMA for short message and control message */
131 typedef struct {
132     gni_mem_handle_t mdh;
133     uint64_t addr;
134 } mdh_addr_t;
135
136 static mdh_addr_t            *fma_buffer_mdh_addr_base;
137
138 typedef struct msg_list
139 {
140     uint32_t destNode;
141     uint32_t size;
142     void *msg;
143     struct msg_list *next;
144     uint8_t tag;
145 }MSG_LIST;
146
147 typedef struct control_msg
148 {
149     uint64_t            source_addr;
150     int                 source;               /* source rank */
151     int                 length;
152     gni_mem_handle_t    source_mem_hndl;
153     struct control_msg *next;
154 }CONTROL_MSG;
155
156 typedef struct  rmda_msg
157 {
158     int                   destNode;
159     gni_post_descriptor_t *pd;
160     struct  rmda_msg      *next;
161 }RDMA_REQUEST;
162
163 /* reuse PendingMsg memory */
164 static CONTROL_MSG          *control_freelist=0;
165 static MSG_LIST             *msglist_freelist=0;
166 static RDMA_REQUEST         *rdma_freelist = 0;
167 #define FreeControlMsg(d)       \
168   do {  \
169   (d)->next = control_freelist;\
170   control_freelist = d;\
171   } while (0); 
172
173 #define MallocControlMsg(d) \
174   d = control_freelist;\
175   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
176              _MEMCHECK(d);\
177   } else control_freelist = d->next;
178
179
180 #define FreeMsgList(d)       \
181   do {  \
182   (d)->next = msglist_freelist;\
183   msglist_freelist = d;\
184   } while (0); 
185
186 #define MallocMsgList(d) \
187   d = msglist_freelist;\
188   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
189              _MEMCHECK(d);\
190   } else msglist_freelist = d->next;
191
192 #define FreeRdmaRequest(d)       \
193   do {  \
194   (d)->next = rdma_freelist;\
195   rdma_freelist = d;\
196   } while (0); 
197
198 #define MallocRdmaRequest(d) \
199   d = rdma_freelist;\
200   if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
201              _MEMCHECK(d);\
202   } else rdma_freelist = d->next;
203
204 /* reuse gni_post_descriptor_t */
205 static gni_post_descriptor_t *post_freelist=NULL;
206
207 #if 1
208 #define FreePostDesc(d)       \
209   do {  \
210     (d)->next_descr = post_freelist;\
211     post_freelist = d;\
212   } while (0); 
213
214 #define MallocPostDesc(d) \
215   d = post_freelist;\
216   if (d==0) { \
217      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
218      _MEMCHECK(d);\
219   } else post_freelist = d->next_descr;
220 #else
221
222 #define FreePostDesc(d)     free(d);
223 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
224
225 #endif
226
227 static PENDING_GETNEXT     *pending_smsg_head = 0;
228 static PENDING_GETNEXT     *pending_smsg_tail = 0;
229
230 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
231 static MSG_LIST *buffered_smsg_head= 0;
232 static MSG_LIST *buffered_smsg_tail= 0;
233
234 /* SmsgSend return success but message sent is not confirmed by remote side */
235
236 static RDMA_REQUEST  *pending_rdma_head = 0;
237 static RDMA_REQUEST  *pending_rdma_tail = 0;
238
239 static MSG_LIST *buffered_fma_head = 0;
240 static MSG_LIST *buffered_fma_tail = 0;
241
242 /* functions  */
243
244 static void
245 allgather(void *in,void *out, int len)
246 {
247     //PMI_Allgather is out of order
248     int i,rc, extend_len;
249     int  rank_index;
250     char *out_ptr, *out_ref;
251     char *in2;
252
253     extend_len = sizeof(int) + len;
254     in2 = (char*)malloc(extend_len);
255
256     memcpy(in2, &myrank, sizeof(int));
257     memcpy(in2+sizeof(int), in, len);
258
259     out_ptr = (char*)malloc(mysize*extend_len);
260
261     rc = PMI_Allgather(in2, out_ptr, extend_len);
262     GNI_RC_CHECK("allgather", rc);
263
264     out_ref = out;
265
266     for(i=0;i<mysize;i++) {
267         //rank index 
268         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
269         //copy to the rank index slot
270         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
271     }
272
273     free(out_ptr);
274     free(in2);
275
276 }
277
278 static unsigned int get_gni_nic_address(int device_id)
279 {
280     unsigned int address, cpu_id;
281     gni_return_t status;
282     int i, alps_dev_id=-1,alps_address=-1;
283     char *token, *p_ptr;
284
285     p_ptr = getenv("PMI_GNI_DEV_ID");
286     if (!p_ptr) {
287         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
288        
289         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
290     } else {
291         while ((token = strtok(p_ptr,":")) != NULL) {
292             alps_dev_id = atoi(token);
293             if (alps_dev_id == device_id) {
294                 break;
295             }
296             p_ptr = NULL;
297         }
298         CmiAssert(alps_dev_id != -1);
299         p_ptr = getenv("PMI_GNI_LOC_ADDR");
300         CmiAssert(p_ptr != NULL);
301         i = 0;
302         while ((token = strtok(p_ptr,":")) != NULL) {
303             if (i == alps_dev_id) {
304                 alps_address = atoi(token);
305                 break;
306             }
307             p_ptr = NULL;
308             ++i;
309         }
310         CmiAssert(alps_address != -1);
311         address = alps_address;
312     }
313     return address;
314 }
315
316 static uint8_t get_ptag(void)
317 {
318     char *p_ptr, *token;
319     uint8_t ptag;
320
321     p_ptr = getenv("PMI_GNI_PTAG");
322     CmiAssert(p_ptr != NULL);
323     token = strtok(p_ptr, ":");
324     ptag = (uint8_t)atoi(token);
325     return ptag;
326         
327 }
328
329 static uint32_t get_cookie(void)
330 {
331     uint32_t cookie;
332     char *p_ptr, *token;
333
334     p_ptr = getenv("PMI_GNI_COOKIE");
335     CmiAssert(p_ptr != NULL);
336     token = strtok(p_ptr, ":");
337     cookie = (uint32_t)atoi(token);
338
339     return cookie;
340 }
341
342 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
343 /* TODO: add any that are related */
344 /* =====End of Definitions of Message-Corruption Related Macros=====*/
345
346
347 #include "machine-lrts.h"
348 #include "machine-common-core.c"
349
350 /* Network progress function is used to poll the network when for
351    messages. This flushes receive buffers on some  implementations*/
352 #if CMK_MACHINE_PROGRESS_DEFINED
353 void CmiMachineProgressImpl() {
354 }
355 #endif
356
357 inline
358 static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
359 {
360     MSG_LIST        *msg_tmp;
361     MallocMsgList(msg_tmp);
362     msg_tmp->destNode = destNode;
363     msg_tmp->size   = size;
364     msg_tmp->msg    = msg;
365     msg_tmp->tag    = tag;
366     msg_tmp->next   = NULL;
367     if (buffered_smsg_tail == NULL) {
368       buffered_smsg_head  = msg_tmp;
369     }
370     else {
371       buffered_smsg_tail->next    = msg_tmp;
372     }
373     buffered_smsg_tail          = msg_tmp;
374     // CmiPrintf("[%d] delay_send_small_msg msg to PE %d  tag: 0x%x \n", myrank, destNode, tag);
375 }
376
377 static int send_with_smsg(int destNode, int size, char *msg)
378 {
379     gni_return_t        status  =   GNI_RC_SUCCESS;
380     CONTROL_MSG         *control_msg_tmp;
381     const uint8_t       tag_data    = DATA_TAG;
382     const uint8_t       tag_control = LMSG_INIT_TAG ;
383     uint32_t            vmdh_index  = -1;
384
385     CmiSetMsgSize(msg, size);
386 #if PRINT_SYH
387     lrts_send_request++;
388     CmiPrintf("LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_request);
389 #endif
390     /* No mailbox available, buffer this msg and its info */
391     if(buffered_smsg_head != 0)
392     {
393         if(size <=SMSG_MAX_MSG)
394         {
395             delay_send_small_msg(msg, size, destNode, tag_data);
396         }
397         else
398         {
399             MallocControlMsg(control_msg_tmp);
400             control_msg_tmp->source_addr    = (uint64_t)msg;
401             control_msg_tmp->source         = myrank;
402             control_msg_tmp->length         =size; 
403             control_msg_tmp->source_mem_hndl.qword1 = 0;
404             control_msg_tmp->source_mem_hndl.qword2 = 0;
405             delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, tag_control);
406         }
407         return 0;
408     }
409     else {
410         /* Can use SMSGSend */
411         if(size <= SMSG_MAX_MSG)
412         {
413             /* send the msg itself */
414             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag_data);
415             //CmiPrintf("[%d] send_with_smsg sends a data msg to PE %d status: %s\n", myrank, destNode, gni_err_str[status]);
416             if (status == GNI_RC_SUCCESS)
417             {
418                 CmiFree(msg);
419                 return 1;
420             }
421             else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
422             {
423                 //CmiPrintf("[%d] data msg add to send queue\n", myrank);
424                 delay_send_small_msg(msg, size, destNode, tag_data);
425                 return 0;
426             }
427             else
428                 GNI_RC_CHECK("GNI_SmsgSendWTag", status);
429         }else
430         {
431             /* construct a control message and send */
432             //control_msg_tmp = (CONTROL_MSG *)malloc(sizeof(CONTROL_MSG));
433             MallocControlMsg(control_msg_tmp);
434             control_msg_tmp->source_addr    = (uint64_t)msg;
435             control_msg_tmp->source         = myrank;
436             control_msg_tmp->length         = size;
437             
438             status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, size, &(control_msg_tmp->source_mem_hndl), &omdh);
439             
440             if(status == GNI_RC_ERROR_RESOURCE || status == GNI_RC_ERROR_NOMEM)
441             {
442                 control_msg_tmp->source_mem_hndl.qword1 = 0;
443                 control_msg_tmp->source_mem_hndl.qword2 = 0;
444             }else if(status == GNI_RC_SUCCESS)
445             {
446                 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
447                 //CmiPrintf("[%d] send_with_smsg sends a control msg to PE %d status: %d\n", myrank, destNode, status);
448                 if(status == GNI_RC_SUCCESS)
449                 {
450                     FreeControlMsg(control_msg_tmp);
451                     return 1;
452                 }
453             }
454             else
455             {
456                 GNI_RC_CHECK("MemRegister fails at ", status);
457             }
458             
459             // Memory register fails or send fails 
460             /* store into buffer smsg_list and send later */
461             delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, tag_control);
462             return 0;
463         }
464     }
465 }
466
467 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
468 {
469     if(useStaticSMSG)
470     {
471         send_with_smsg(destNode, size, msg); 
472     }
473     else {
474         CmiAssert(0);
475     }
476     return 0;
477 }
478
479 static void LrtsPreCommonInit(int everReturn){}
480
481 /* Idle-state related functions: called in non-smp mode */
482 void CmiNotifyIdleForGemini(void) {
483     LrtsAdvanceCommunication();
484 }
485
486 static void LrtsPostCommonInit(int everReturn)
487 {
488 #if CMK_SMP
489     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
490     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
491 #else
492     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForGemini,NULL);
493 #endif
494
495 }
496
497
498 void LrtsPostNonLocal(){}
499 /* pooling CQ to receive network message */
500 static int  processSmsg(uint64_t inst_id);
501 static void PumpNetworkMsgs()
502 {
503     uint64_t            inst_id;
504     PENDING_GETNEXT     *pending_next;
505     int                 ret;
506     gni_cq_entry_t      event_data;
507     gni_return_t        status;
508
509     while(pending_smsg_head != 0)
510     {
511         pending_next = pending_smsg_head;
512         ret = processSmsg(pending_next->inst_id);
513         if(ret == 0)
514             break;
515         else
516         {
517 #if PRINT_SYH
518             CmiPrintf("Msg does happen %d from %d\n", myrank, pending_next->inst_id);
519 #endif
520             pending_smsg_head=pending_smsg_head->next;
521             free(pending_next);
522         }
523     }
524    
525     while (1) {
526         status = GNI_CqGetEvent(rx_cqh, &event_data);
527         if(status == GNI_RC_SUCCESS)
528         {
529             inst_id = GNI_CQ_GET_INST_ID(event_data);
530             if(GNI_CQ_OVERRUN(event_data))
531             {
532                 CmiPrintf("ERROR in overrun PE:%d\n", myrank);
533                 CmiAbort("Overrun problem and abort");
534             }
535         }else if (status == GNI_RC_NOT_DONE)
536         {
537             return;
538         }else
539         {
540             GNI_RC_CHECK("CQ Get event", status);
541         }
542         ret = processSmsg(inst_id);
543         if (ret == 0) {
544            pending_next = (PENDING_GETNEXT*)malloc(sizeof(PENDING_GETNEXT));   
545            pending_next->next = 0;
546            pending_next->inst_id = inst_id;
547            if(pending_smsg_head == 0)
548            {
549               pending_smsg_head = pending_next;
550            }else
551                pending_smsg_tail->next =pending_next;
552            pending_smsg_tail= pending_next;
553         }
554     }
555 }
556
557 // 0 means no ready message 1means msg received
558 static int  processSmsg(uint64_t inst_id)
559 {
560     void                *header;
561     uint8_t             msg_tag;
562     const uint8_t       data_tag = DATA_TAG;
563     const uint8_t       control_tag = LMSG_INIT_TAG;
564     const uint8_t       ack_tag = ACK_TAG;
565     gni_return_t        status;
566     int                 msg_nbytes;
567     void                *msg_data;
568     gni_mem_handle_t    msg_mem_hndl;
569     CONTROL_MSG         *request_msg;
570     RDMA_REQUEST        *rdma_request_msg;
571     gni_post_descriptor_t *pd;
572     PENDING_GETNEXT     *pending_next;
573  
574     msg_tag = GNI_SMSG_ANY_TAG;
575     status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
576 #if PRINT_SYH
577     CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d, tag=0x%x, status=%s\n", myrank, inst_id, msg_tag, gni_err_str[status]);
578 #endif
579
580     if(status  == GNI_RC_SUCCESS)
581     {
582 #if PRINT_SYH
583         lrts_received_msg++;
584         CmiPrintf("+++[%d] PumpNetwork data msg is received, messageid:%d\n", myrank, lrts_received_msg);
585 #endif
586         /* copy msg out and then put into queue */
587         if(msg_tag == data_tag)
588         {
589             msg_nbytes = CmiGetMsgSize(header);
590             msg_data    = CmiAlloc(msg_nbytes);
591             //CmiPrintf("[%d] PumpNetworkMsgs: get datamsg, size: %d msg id:%d\n", myrank, msg_nbytes, GNI_CQ_GET_MSG_ID(event_data));
592             memcpy(msg_data, (char*)header, msg_nbytes);
593             GNI_SmsgRelease(ep_hndl_array[inst_id]);
594             handleOneRecvedMsg(msg_nbytes, msg_data);
595         }
596         else if(msg_tag == control_tag) 
597         {
598             //CmiPrintf("[%d] PumpNetwork control msg is received\n", myrank);
599             /* initial a get to transfer data from the sender side */
600             request_msg = (CONTROL_MSG *) header;
601             int source = request_msg->source;
602             msg_data = CmiAlloc(request_msg->length);
603             _MEMCHECK(msg_data);
604            
605             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh);
606
607             if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
608             {
609                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
610                 GNI_RC_CHECK("Mem Register before post", status);
611             }
612
613             //buffer this request and send later
614             MallocPostDesc(pd);
615             if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
616                 pd->type            = GNI_POST_FMA_GET;
617             else
618                 pd->type            = GNI_POST_RDMA_GET;
619
620             pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
621             pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
622             pd->length          = ALIGN4(request_msg->length);
623             pd->local_addr      = (uint64_t) msg_data;
624             pd->remote_addr     = request_msg->source_addr;
625             pd->remote_mem_hndl = request_msg->source_mem_hndl;
626             pd->src_cq_hndl     = 0;     /* tx_cqh;  */
627             pd->rdma_mode       = 0;
628
629             GNI_SmsgRelease(ep_hndl_array[inst_id]);
630             //memory registration successful
631             if(status == GNI_RC_SUCCESS)
632             {
633                 pd->local_mem_hndl  = msg_mem_hndl;
634                 if(pd->type == GNI_POST_RDMA_GET) 
635                     status = GNI_PostRdma(ep_hndl_array[source], pd);
636                 else
637                     status = GNI_PostFma(ep_hndl_array[source],  pd);
638             }else
639             {
640                 pd->local_mem_hndl.qword1  = 0; 
641                 pd->local_mem_hndl.qword1  = 0; 
642             }
643             if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
644             {
645                 MallocRdmaRequest(rdma_request_msg);
646                 rdma_request_msg->next = 0;
647                 rdma_request_msg->destNode = inst_id;
648                 if(pending_rdma_head == 0)
649                 {
650                     pending_rdma_head = rdma_request_msg;
651                 }else
652                 {
653                     pending_rdma_tail->next = rdma_request_msg;
654                 }
655                 pending_rdma_tail = rdma_request_msg;
656                 return 1;
657             }else
658                 GNI_RC_CHECK("AFter posting", status);
659         }
660         else if(msg_tag == ack_tag) {
661             /* Get is done, release message . Now put is not used yet*/
662             request_msg = (CONTROL_MSG *) header;
663             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(request_msg->source_mem_hndl), &omdh);
664             
665             CmiFree((void*)request_msg->source_addr);
666             GNI_SmsgRelease(ep_hndl_array[inst_id]);
667             SendRdmaMsg();
668         }else{
669             GNI_SmsgRelease(ep_hndl_array[inst_id]);
670             CmiPrintf("weird tag problem\n");
671             CmiAbort("Unknown tag\n");
672         }
673         return 1;
674     }else 
675     {
676         return 0;
677     }
678 }
679
680 /* Check whether message send or get is confirmed by remote */
681 static void PumpLocalTransactions()
682 {
683     gni_cq_entry_t ev;
684     gni_return_t status;
685     uint64_t type, inst_id;
686     uint8_t         ack_tag = ACK_TAG;
687     gni_post_descriptor_t *tmp_pd;
688     //gni_post_descriptor_t   ack_pd;
689     MSG_LIST  *ptr;
690     CONTROL_MSG *ack_msg_tmp;
691
692     while (1) 
693     {
694         status = GNI_CqGetEvent(tx_cqh, &ev);
695         if(status == GNI_RC_SUCCESS)
696         {
697             type        = GNI_CQ_GET_TYPE(ev);
698             inst_id     = GNI_CQ_GET_INST_ID(ev);
699         }else if (status == GNI_RC_NOT_DONE)
700         {
701             return;
702         }else
703         {
704             GNI_RC_CHECK("CQ Get event", status);
705         }
706
707 #if PRINT_SYH
708         lrts_local_done_msg++;
709         CmiPrintf("*[%d]  PumpLocalTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
710 #endif
711         if (type == GNI_CQ_EVENT_TYPE_SMSG) {
712 #if PRINT_SYH
713             CmiPrintf("**[%d] PumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
714 #endif
715         }
716         else if(type == GNI_CQ_EVENT_TYPE_POST)
717         {
718             status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
719             GNI_RC_CHECK("Local CQ completed ", status);
720             //Message is sent, free message , put is not used now
721             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
722             {
723                 CmiFree((void *)tmp_pd->local_addr);
724             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
725             {
726                 /* Send an ACK to remote side */
727                 MallocControlMsg(ack_msg_tmp);
728                 ack_msg_tmp->source = myrank;
729                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
730                 ack_msg_tmp->length=tmp_pd->length; 
731                 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
732                 //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
733            
734                 if(buffered_smsg_head!=0)
735                 {
736                     delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ack_tag);
737                 }else
738                 {
739                     status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
740                     if(status == GNI_RC_SUCCESS)
741                     {
742                         FreeControlMsg(ack_msg_tmp);
743                     }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
744                     {
745                         delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, ack_tag);
746                     }
747                     else
748                         GNI_RC_CHECK("GNI_SmsgSendWTag", status);
749                 }
750                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
751                 
752                 //handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
753                 CmiAssert(SIZEFIELD((void*)tmp_pd->local_addr) == tmp_pd->length);
754                 handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
755                 SendRdmaMsg(); 
756             }
757             FreePostDesc(tmp_pd);
758         }
759     }   /* end of while loop */
760 }
761
762 static int SendRdmaMsg()
763 {
764     gni_return_t            status = GNI_RC_SUCCESS;
765     gni_mem_handle_t        msg_mem_hndl;
766
767     while(pending_rdma_head != 0)
768     {
769         RDMA_REQUEST *ptr=pending_rdma_head;
770         gni_post_descriptor_t *pd = ptr->pd;
771         // register memory first
772         if( pd->local_mem_hndl.qword1  == 0 && pd->local_mem_hndl.qword2  == 0)
773         {
774             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
775         }
776         if(status == GNI_RC_SUCCESS)
777         {
778             if(pd->type == GNI_POST_RDMA_GET) 
779                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
780             else
781                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
782             if(status == GNI_RC_SUCCESS)
783             {
784                 pending_rdma_head = pending_rdma_head->next; 
785                 FreePostDesc(pd);
786                 FreeRdmaRequest(ptr);
787             }
788             else
789                 return 1;
790         }else
791             return 1;
792     } //end while
793     return 0;
794 }
795
796 static int SendBufferMsg()
797 {
798     MSG_LIST            *ptr;
799     CONTROL_MSG         *control_msg_tmp;
800     const uint8_t       tag_data = DATA_TAG;
801     const uint8_t       tag_control = LMSG_INIT_TAG;
802     const uint8_t       tag_ack = ACK_TAG;
803     gni_return_t        status;
804
805     /* can add flow control here to control the number of messages sent before handle message */
806     while(buffered_smsg_head != 0)
807     {
808         if(useStaticSMSG)
809         {
810             ptr = buffered_smsg_head;
811             if(ptr->tag == tag_data)
812             {
813                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, tag_data);
814                 //CmiPrintf("[%d] SendBufferMsg sends a data msg to PE %d status: %s\n", myrank, ptr->destNode, gni_err_str[status]);
815                 if(status == GNI_RC_SUCCESS) {
816                     CmiFree(ptr->msg);
817                 }
818             }
819             else if(ptr->tag ==tag_control)
820             {
821                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
822                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
823                 {
824                     MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
825                     if(status != GNI_RC_SUCCESS)
826                         break;
827                 }
828                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_control);
829                 //CmiPrintf("[%d] SendBufferMsg sends a control msg to PE %d status: %d\n", myrank, ptr->destNode, status);
830                 if(status == GNI_RC_SUCCESS) {
831                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
832                 }
833             }else if (ptr->tag == tag_ack)
834             {
835                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_ack);
836                 //CmiPrintf("[%d] SendBufferMsg sends a tag msg to PE %d status: %d\n", myrank, ptr->destNode, status);
837                 if(status == GNI_RC_SUCCESS) {
838                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
839                 }
840             }
841         } else if(useStaticMSGQ)
842         {
843             CmiAbort("MSGQ Send not done\n");
844         }else
845         {
846             CmiAbort("FMA Send not done\n");
847         }
848         if(status == GNI_RC_SUCCESS)
849         {
850             buffered_smsg_head = buffered_smsg_head->next;
851             FreeMsgList(ptr);
852         }else
853             return 0;
854     }
855     return 1;
856 }
857
858 static void LrtsAdvanceCommunication()
859 {
860     /*  Receive Msg first */
861     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
862     PumpNetworkMsgs();
863     /* Release Sent Msg */
864     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
865     PumpLocalTransactions();
866     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
867     /* Send buffered Message */
868     SendBufferMsg();
869 }
870
871 static void _init_static_smsg()
872 {
873     gni_smsg_attr_t      *smsg_attr;
874     gni_smsg_attr_t      *smsg_attr_vec;
875     unsigned int         smsg_memlen;
876     gni_mem_handle_t     my_smsg_mdh_mailbox;
877     register    int      i;
878     gni_return_t status;
879     uint32_t              vmdh_index = -1;
880
881      smsg_attr = (gni_smsg_attr_t *)malloc(mysize*sizeof(gni_smsg_attr_t));
882     _MEMCHECK(smsg_attr);
883
884     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
885     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
886     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
887     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
888     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
889     smsg_mailbox_base = memalign(64, smsg_memlen*(mysize-1));
890     _MEMCHECK(smsg_mailbox_base);
891     bzero(smsg_mailbox_base, smsg_memlen);
892     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
893             smsg_memlen, rx_cqh,
894             GNI_MEM_READWRITE | GNI_MEM_USE_GART | GNI_MEM_PI_FLUSH,   
895             vmdh_index,
896             &my_smsg_mdh_mailbox);
897
898     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
899
900     for(i=0; i<mysize; i++)
901     {
902         if(i==myrank)
903             continue;
904         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
905         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
906         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
907         if(i<myrank)
908             smsg_attr[i].mbox_offset = i*smsg_memlen;
909         else
910             smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
911
912         smsg_attr[i].msg_buffer = smsg_mailbox_base;
913         smsg_attr[i].buff_size = smsg_memlen*(mysize-1);
914         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
915     }
916     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
917     _MEMCHECK(smsg_attr_vec);
918    
919     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
920     for(i=0; i<mysize; i++)
921     {
922         if (myrank == i) continue;
923         /* initialize the smsg channel */
924         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
925         GNI_RC_CHECK("SMSG Init", status);
926     } //end initialization
927     free(smsg_attr);
928     free(smsg_attr_vec);
929
930 /*
931     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
932     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
933 */
934
935
936 static void _init_static_msgq()
937 {
938     gni_return_t status;
939     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
940     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
941     msgq_attrs.smsg_q_sz = 1;
942     msgq_attrs.rcv_pool_sz = 1;
943     msgq_attrs.num_msgq_eps = 2;
944     msgq_attrs.nloc_insts = 8;
945     msgq_attrs.modes = 0;
946     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
947
948     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
949     GNI_RC_CHECK("MSGQ Init", status);
950
951
952 }
953 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
954 {
955     register int            i;
956     int                     rc;
957     int                     device_id = 0;
958     unsigned int            remote_addr;
959     gni_cdm_handle_t        cdm_hndl;
960     gni_return_t            status = GNI_RC_SUCCESS;
961     uint32_t                vmdh_index = -1;
962     uint8_t                 ptag;
963     unsigned int            local_addr, *MPID_UGNI_AllAddr;
964     int                     first_spawned;
965     int                     physicalID;
966     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
967     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
968     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
969     
970     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
971     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
972     
973     status = PMI_Init(&first_spawned);
974     GNI_RC_CHECK("PMI_Init", status);
975
976     status = PMI_Get_size(&mysize);
977     GNI_RC_CHECK("PMI_Getsize", status);
978
979     status = PMI_Get_rank(&myrank);
980     GNI_RC_CHECK("PMI_getrank", status);
981
982     physicalID = CmiPhysicalNodeID(myrank);
983     
984     printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
985
986     *myNodeID = myrank;
987     *numNodes = mysize;
988   
989     if(myrank == 0)
990     {
991         printf("Charm++> Running on Gemini (GNI)\n");
992     }
993 #ifdef USE_ONESIDED
994     onesided_init(NULL, &onesided_hnd);
995
996     // this is a GNI test, so use the libonesided bypass functionality
997     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
998     local_addr = gniGetNicAddress();
999 #else
1000     ptag = get_ptag();
1001     cookie = get_cookie();
1002     
1003     //Create and attach to the communication  domain */
1004     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
1005     GNI_RC_CHECK("GNI_CdmCreate", status);
1006     //* device id The device id is the minor number for the device
1007     //that is assigned to the device by the system when the device is created.
1008     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
1009     //where X is the device number 0 default 
1010     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
1011     GNI_RC_CHECK("GNI_CdmAttach", status);
1012     local_addr = get_gni_nic_address(0);
1013 #endif
1014     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
1015     _MEMCHECK(MPID_UGNI_AllAddr);
1016     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
1017     /* create the local completion queue */
1018     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
1019     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &tx_cqh);
1020     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
1021     
1022     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
1023
1024     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
1025     GNI_RC_CHECK("Create CQ (rx)", status);
1026     
1027     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
1028     //GNI_RC_CHECK("Create BTE CQ", status);
1029
1030     /* create the endpoints. they need to be bound to allow later CQWrites to them */
1031     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
1032     _MEMCHECK(ep_hndl_array);
1033
1034     for (i=0; i<mysize; i++) {
1035         if(i == myrank) continue;
1036         status = GNI_EpCreate(nic_hndl, tx_cqh, &ep_hndl_array[i]);
1037         GNI_RC_CHECK("GNI_EpCreate ", status);   
1038         remote_addr = MPID_UGNI_AllAddr[i];
1039         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
1040         GNI_RC_CHECK("GNI_EpBind ", status);   
1041     }
1042     /* Depending on the number of cores in the job, decide different method */
1043     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
1044     if(useStaticSMSG == 1)
1045     {
1046         _init_static_smsg(mysize);
1047     }else if(useStaticMSGQ == 1)
1048     {
1049         _init_static_msgq();
1050     }
1051     free(MPID_UGNI_AllAddr);
1052 }
1053
1054 #define ALIGNBUF                64
1055
1056 void* LrtsAlloc(int n_bytes, int header)
1057 {
1058     if(n_bytes <= SMSG_MAX_MSG)
1059     {
1060         int totalsize = n_bytes+header;
1061         return malloc(totalsize);
1062     }else 
1063     {
1064         CmiAssert(header <= ALIGNBUF);
1065         n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
1066         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1067         return res + ALIGNBUF - header;
1068     }
1069 }
1070
1071 void  LrtsFree(void *msg)
1072 {
1073     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1074     if (size <= SMSG_MAX_MSG)
1075       free(msg);
1076     else
1077       free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1078 }
1079
1080 static void LrtsExit()
1081 {
1082     /* free memory ? */
1083     PMI_Finalize();
1084     exit(0);
1085 }
1086
1087 static void LrtsDrainResources()
1088 {
1089     while (!SendBufferMsg()) {
1090       PumpNetworkMsgs();
1091       PumpLocalTransactions();
1092     }
1093 }
1094
1095 void CmiAbort(const char *message) {
1096
1097     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1098     PMI_Abort(-1, message);
1099 }
1100
1101 /**************************  TIMER FUNCTIONS **************************/
1102 #if CMK_TIMER_USE_SPECIAL
1103 /* MPI calls are not threadsafe, even the timer on some machines */
1104 static CmiNodeLock  timerLock = 0;
1105 static int _absoluteTime = 0;
1106 static double starttimer = 0;
1107 static int _is_global = 0;
1108
1109 int CmiTimerIsSynchronized() {
1110     return 0;
1111 }
1112
1113 int CmiTimerAbsolute() {
1114     return _absoluteTime;
1115 }
1116
1117 double CmiStartTimer() {
1118     return 0.0;
1119 }
1120
1121 double CmiInitTime() {
1122     return starttimer;
1123 }
1124
1125 void CmiTimerInit(char **argv) {
1126 }
1127
1128 /**
1129  * Since the timerLock is never created, and is
1130  * always NULL, then all the if-condition inside
1131  * the timer functions could be disabled right
1132  * now in the case of SMP.
1133  */
1134 double CmiTimer(void) {
1135
1136     return 0;
1137 }
1138
1139 double CmiWallTimer(void) {
1140     return 0;
1141 }
1142
1143 double CmiCpuTimer(void) {
1144     return 0;
1145 }
1146
1147 #endif
1148 /************Barrier Related Functions****************/
1149
1150 int CmiBarrier()
1151 {
1152     int status;
1153     status = PMI_Barrier();
1154     return status;
1155
1156 }