minor changes including reuse gni_post_descriptor_t
[charm.git] / src / arch / gemini_gni / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$  Yanhua Sun
4  * $Date$  07-01-2011
5  * $Revision$ 
6  *****************************************************************************/
7
8 /** @file
9  * Gemini GNI machine layer
10  */
11 /*@{*/
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <errno.h>
16 #include <malloc.h>
17 #include "converse.h"
18
19 #include "gni_pub.h"
20 #include "pmi.h"
21 /*Support for ++debug: */
22 #if defined(_WIN32) && ! defined(__CYGWIN__)
23 #include <windows.h>
24 #include <wincon.h>
25 #include <sys/types.h>
26 #include <sys/timeb.h>
27
28 //#define  USE_ONESIDED 1
29
30 #ifdef USE_ONESIDED
31 #include "onesided.h"
32 #endif
33 static void sleep(int secs) {
34     Sleep(1000*secs);
35 }
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #include "machine.h"
42
43 #include "pcqueue.h"
44
45 #define DEBUY_PRINT
46
47 #ifdef DEBUY_PRINT
48 #define PRINT_INFO(msg) {fprintf(stdout, "[%d] %s\n", CmiMyPe(), msg); fflush(stdout);}
49 #else
50 #define PRINT_INFO(msg)
51 #endif
52 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
53 /* If SMSG is not used */
54 #define FMA_PER_CORE  1024
55 #define FMA_BUFFER_SIZE 1024
56 /* If SMSG is used */
57 #define SMSG_MAX_MSG    1024
58 #define SMSG_MAX_CREDIT 16
59
60 #define MSGQ_MAXSIZE       4096
61 /* large message transfer with FMA or BTE */
62 #define LRTS_GNI_RDMA_THRESHOLD  16384
63
64 #define REMOTE_QUEUE_ENTRIES  1048576
65 #define LOCAL_QUEUE_ENTRIES 1024
66 /* SMSG is data message */
67 #define DATA_TAG        0x38
68 /* SMSG is a control message to initialize a BTE */
69 #define LMSG_INIT_TAG        0x39 
70 #define ACK_TAG         0x37
71
72 #define DEBUG
73 #ifdef GNI_RC_CHECK
74 #undef GNI_RC_CHECK
75 #endif
76 #ifdef DEBUG
77 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           CmiPrintf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
78 #else
79 #define GNI_RC_CHECK(msg,rc)
80 #endif
81
82 #ifdef USE_ONESIDED
83 onesided_hnd_t   onesided_hnd;
84 onesided_md_t    omdh;
85 #endif
86
87 static int useStaticSMSG   = 1;
88 static int useStaticMSGQ = 0;
89 static int useStaticFMA = 0;
90 static int mysize, myrank;
91 static gni_nic_handle_t      nic_hndl;
92
93
94 static void             **smsg_attr_ptr;
95 gni_msgq_attr_t         msgq_attrs;
96 gni_msgq_handle_t       msgq_handle;
97 gni_msgq_ep_attr_t      msgq_ep_attrs;
98 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
99 /* =====Beginning of Declarations of Machine Specific Variables===== */
100 static int cookie;
101 static int modes = 0;
102 static gni_cq_handle_t       rx_cqh = NULL;
103 static gni_cq_handle_t       remote_bte_cq_hndl = NULL;
104 static gni_cq_handle_t       tx_cqh = NULL;
105 static gni_ep_handle_t       *ep_hndl_array;
106
107 /* preallocated memory buffer for FMA for short message and control message */
108 static int              fma_buffer_len_eachcore = FMA_BUFFER_SIZE;
109 typedef struct {
110     gni_mem_handle_t mdh;
111     uint64_t addr;
112 } mdh_addr_t;
113
114 static mdh_addr_t            *fma_buffer_mdh_addr_base;
115 typedef struct msg_list
116 {
117     uint32_t destNode;
118     uint32_t size;
119     void *msg;
120     struct msg_list *next;
121     uint8_t tag;
122 }MSG_LIST;
123
124 typedef struct control_msg
125 {
126     uint64_t            source_addr;
127     int                 source;               /* source rank */
128     int                 length;
129     gni_mem_handle_t    source_mem_hndl;
130     struct control_msg *next;
131 }CONTROL_MSG;
132
133 /* reuse PendingMsg memory */
134 static CONTROL_MSG  *control_freelist=0;
135 static MSG_LIST     *msglist_freelist=0;
136
137 #define FreeControlMsg(d)       \
138   do {  \
139   (d)->next = control_freelist;\
140   control_freelist = d;\
141   } while (0); 
142
143 #define MallocControlMsg(d) \
144   d = control_freelist;\
145   if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
146              _MEMCHECK(d);\
147   } else control_freelist = d->next;
148
149
150 #define FreeMsgList(d)       \
151   do {  \
152   (d)->next = msglist_freelist;\
153   msglist_freelist = d;\
154   } while (0); 
155
156 #define MallocMsgList(d) \
157   d = msglist_freelist;\
158   if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
159              _MEMCHECK(d);\
160   } else msglist_freelist = d->next;
161
162 /* reuse gni_post_descriptor_t */
163 static gni_post_descriptor_t *post_freelist=NULL;
164
165 #define FreePostDesc(d)       \
166   do {  \
167     (d)->next_descr = post_freelist;\
168     post_freelist = d;\
169   } while (0); 
170
171 #define MallocPostDesc(d) \
172   d = post_freelist;\
173   if (d==0) { \
174      d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
175      _MEMCHECK(d);\
176   } else post_freelist = d->next_descr;
177
178
179 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
180 static MSG_LIST *buffered_smsg_head= 0;
181 static MSG_LIST *buffered_smsg_tail= 0;
182 /* SmsgSend return success but message sent is not confirmed by remote side */
183
184 static MSG_LIST *buffered_fma_head = 0;
185 static MSG_LIST *buffered_fma_tail = 0;
186
187 /* functions  */
188
189
190 static void* LrtsAllocRegister(int n_bytes, gni_mem_handle_t* mem_hndl);
191
192 static void
193 allgather(void *in,void *out, int len)
194 {
195     //PMI_Allgather is out of order
196     int i,rc, extend_len;
197     int  rank_index;
198     char *out_ptr, *out_ref;
199     char *in2;
200
201     extend_len = sizeof(int) + len;
202     in2 = (char*)malloc(extend_len);
203
204     memcpy(in2, &myrank, sizeof(int));
205     memcpy(in2+sizeof(int), in, len);
206
207     out_ptr = (char*)malloc(mysize*extend_len);
208
209     rc = PMI_Allgather(in2, out_ptr, extend_len);
210     GNI_RC_CHECK("allgather", rc);
211
212     out_ref = out;
213
214     for(i=0;i<mysize;i++) {
215         //rank index 
216         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
217         //copy to the rank index slot
218         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
219     }
220
221     free(out_ptr);
222     free(in2);
223
224 }
225
226 static unsigned int get_gni_nic_address(int device_id)
227 {
228     unsigned int address, cpu_id;
229     gni_return_t status;
230     int i, alps_dev_id=-1,alps_address=-1;
231     char *token, *p_ptr;
232
233     p_ptr = getenv("PMI_GNI_DEV_ID");
234     if (!p_ptr) {
235         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
236        
237         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
238     } else {
239         while ((token = strtok(p_ptr,":")) != NULL) {
240             alps_dev_id = atoi(token);
241             if (alps_dev_id == device_id) {
242                 break;
243             }
244             p_ptr = NULL;
245         }
246         CmiAssert(alps_dev_id != -1);
247         p_ptr = getenv("PMI_GNI_LOC_ADDR");
248         CmiAssert(p_ptr != NULL);
249         i = 0;
250         while ((token = strtok(p_ptr,":")) != NULL) {
251             if (i == alps_dev_id) {
252                 alps_address = atoi(token);
253                 break;
254             }
255             p_ptr = NULL;
256             ++i;
257         }
258         CmiAssert(alps_address != -1);
259         address = alps_address;
260     }
261     return address;
262 }
263
264 static uint8_t get_ptag(void)
265 {
266     char *p_ptr, *token;
267     uint8_t ptag;
268
269     p_ptr = getenv("PMI_GNI_PTAG");
270     CmiAssert(p_ptr != NULL);
271     token = strtok(p_ptr, ":");
272     ptag = (uint8_t)atoi(token);
273     return ptag;
274         
275 }
276
277 static uint32_t get_cookie(void)
278 {
279     uint32_t cookie;
280     char *p_ptr, *token;
281
282     p_ptr = getenv("PMI_GNI_COOKIE");
283     CmiAssert(p_ptr != NULL);
284     token = strtok(p_ptr, ":");
285     cookie = (uint32_t)atoi(token);
286
287     return cookie;
288 }
289
290 /*
291  * Local side event handler
292  *
293  */
294
295 void LocalEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
296 {
297
298     int type;
299
300     type = GNI_CQ_GET_TYPE(*cq_entry);
301
302     if(type == GNI_CQ_EVENT_TYPE_SMSG)
303     {
304
305     }
306 }
307
308 void RemoteSmsgEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
309 {
310 }
311
312 void RemoteBteEventHandle(gni_cq_entry_t *cq_entry, void *userdata)
313 {
314 }
315 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
316 /* TODO: add any that are related */
317 /* =====End of Definitions of Message-Corruption Related Macros=====*/
318
319
320 #include "machine-lrts.h"
321 #include "machine-common-core.c"
322
323 /* Network progress function is used to poll the network when for
324    messages. This flushes receive buffers on some  implementations*/
325 #if CMK_MACHINE_PROGRESS_DEFINED
326 void CmiMachineProgressImpl() {
327 }
328 #endif
329
330
331 /* 
332  * The message can be copied to registered memory buffer and be sent
333  * This message memory can be registered to network. It depends on which one is cheaper
334  * register might be better when the msg is large
335  */
336
337 static int send_with_fma(int destNode, int size, char *msg)
338 {
339     gni_post_descriptor_t   pd;
340     gni_return_t status;
341     CONTROL_MSG *control_msg_tmp;
342     MSG_LIST *msg_tmp;
343     uint32_t              vmdh_index = -1;
344     if(buffered_fma_head != 0)
345     {
346         MallocMsgList(msg_tmp);
347         msg_tmp->msg = msg;
348         msg_tmp->destNode = destNode;
349         msg_tmp ->size = size;
350         buffered_smsg_tail->next = msg_tmp;
351         buffered_smsg_tail = msg_tmp;
352         return 0;
353     } else
354     {
355         pd.type            = GNI_POST_FMA_PUT;
356         pd.cq_mode         = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
357         pd.dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
358         pd.length          = 8;
359         pd.remote_addr     = fma_buffer_mdh_addr_base[destNode].addr + fma_buffer_len_eachcore*destNode;
360         pd.remote_mem_hndl = fma_buffer_mdh_addr_base[destNode].mdh;
361         if(size < FMA_BUFFER_SIZE)
362         {
363             /* send the message */
364             pd.local_addr      = (uint64_t) msg;
365         }else
366         {
367             /* construct a control message and send */
368             control_msg_tmp = (CONTROL_MSG *)malloc((int)sizeof(CONTROL_MSG));
369             GNI_MemRegister(nic_hndl, (uint64_t)msg, 
370                 size, remote_bte_cq_hndl,
371                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
372                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
373             
374             control_msg_tmp->source = _Cmi_mynode;
375             control_msg_tmp->source_addr = (uint64_t)msg;
376             control_msg_tmp->length = size;
377         }
378         status = GNI_PostFma(ep_hndl_array[destNode], &pd);
379         if(status == GNI_RC_SUCCESS)
380             return 1;
381         else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
382         {
383             MallocMsgList(msg_tmp);
384             msg_tmp->msg = control_msg_tmp;
385             msg_tmp->destNode = destNode;
386             msg_tmp ->size = size;
387             /* store into buffer fma_list and send later */
388             buffered_fma_head = msg_tmp;
389             buffered_fma_tail = msg_tmp;
390             return 0;
391         }
392     }
393 }
394
395 static int send_with_smsg(int destNode, int size, char *msg)
396 {
397     gni_return_t        status;
398     MSG_LIST            *msg_tmp;
399     CONTROL_MSG         *control_msg_tmp;
400     uint8_t             tag_data    = DATA_TAG;
401     uint8_t             tag_control = LMSG_INIT_TAG ;
402     uint32_t            vmdh_index  = -1;
403
404     /* No mailbox available, buffer this msg and its info */
405     if(buffered_smsg_head != 0)
406     {
407         MallocMsgList(msg_tmp);
408         msg_tmp->destNode = destNode;
409         if(size <=SMSG_MAX_MSG)
410         {
411             msg_tmp->size   = size;
412             msg_tmp->msg    = msg;
413             msg_tmp->tag    = tag_data;
414         }else
415         {
416             MallocControlMsg(control_msg_tmp);
417
418             status = GNI_MemRegister(nic_hndl, (uint64_t)msg, 
419                 size, rx_cqh,
420                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
421                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
422             
423             GNI_RC_CHECK("MemRegister fails at ", status);
424             control_msg_tmp->source_addr    = (uint64_t)msg;
425             control_msg_tmp->source         = myrank;
426             control_msg_tmp->length         =size; 
427             msg_tmp->size                   = sizeof(CONTROL_MSG);
428             msg_tmp->msg                    = control_msg_tmp;
429             msg_tmp->tag                    = tag_control;
430         }
431         msg_tmp->next               = 0;
432         buffered_smsg_tail->next    = msg_tmp;
433         buffered_smsg_tail          = msg_tmp;
434         return 0;
435     }
436     else {
437         /* Can use SMSGSend */
438         if(size <= SMSG_MAX_MSG)
439         {
440             /* send the msg itself */
441             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], &size, sizeof(int), msg, size, 0, tag_data);
442             if (status == GNI_RC_SUCCESS)
443             {
444                 CmiFree(msg);
445                 return 1;
446             }
447             else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
448             {
449                 //CmiPrintf("[%d] data msg add to send queue\n", myrank);
450                 MallocMsgList(msg_tmp);
451                 msg_tmp->destNode   = destNode;
452                 msg_tmp->size       = size;
453                 msg_tmp->msg        = msg;
454                 msg_tmp->tag        = tag_data;
455                 msg_tmp->next       = 0;
456                 /* store into buffer smsg_list and send later */
457                 buffered_smsg_head  = msg_tmp;
458                 buffered_smsg_tail  = msg_tmp;
459                 return 0;
460             }
461             else
462                 GNI_RC_CHECK("GNI_SmsgSendWTag", status);
463         }else
464         {
465             /* construct a control message and send */
466             //control_msg_tmp = (CONTROL_MSG *)malloc(sizeof(CONTROL_MSG));
467             MallocControlMsg(control_msg_tmp);
468             control_msg_tmp->source_addr    = (uint64_t)msg;
469             control_msg_tmp->source         = myrank;
470             control_msg_tmp->length         = size; 
471             status = GNI_MemRegister(nic_hndl, (uint64_t)msg, 
472                 size, rx_cqh,
473                 GNI_MEM_READ_ONLY | GNI_MEM_USE_GART,
474                 vmdh_index, &(control_msg_tmp->source_mem_hndl));
475             
476             GNI_RC_CHECK("MemRegister fails at ", status);
477             status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, tag_control);
478             if(status == GNI_RC_SUCCESS)
479             {
480                 FreeControlMsg(control_msg_tmp);
481                 //CmiPrintf("Control message sent bytes:%d on PE:%d, source=%d, add=%p, mem_hndl=%ld, %ld\n", sizeof(CONTROL_MSG), myrank, control_msg_tmp->source, (void*)control_msg_tmp->source_addr, (control_msg_tmp->source_mem_hndl).qword1, (control_msg_tmp->source_mem_hndl).qword2);
482                 return 1;
483             }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE) 
484             {
485                 //CmiPrintf("[%d] control msg add to send queue\n", myrank);
486                 MallocMsgList(msg_tmp);
487                 msg_tmp->destNode   = destNode;
488                 msg_tmp ->size      = sizeof(CONTROL_MSG);
489                 msg_tmp->msg        = control_msg_tmp;
490                 msg_tmp->tag        = tag_control;
491                 msg_tmp->next       = 0;
492                 /* store into buffer smsg_list and send later */
493                 buffered_smsg_head = msg_tmp;
494                 buffered_smsg_tail = msg_tmp;
495                 return 0;
496             }
497             else
498                 GNI_RC_CHECK("GNI_SmsgSendWTag", status);
499         }
500     }
501 }
502
503 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
504 {
505     //PRINT_INFO("Calling LrtsSend")
506     if(useStaticSMSG)
507     {
508         send_with_smsg(destNode, size, msg); 
509     }else
510     {
511         send_with_fma(destNode, size, msg); 
512     }
513     return 0;
514 }
515
516 static void LrtsPreCommonInit(int everReturn){}
517 static void LrtsPostCommonInit(int everReturn){}
518
519 void LrtsPostNonLocal(){}
520 /* pooling CQ to receive network message */
521 static void PumpNetworkMsgs()
522 {
523     void                *header;
524     uint8_t             msg_tag, data_tag, control_tag, ack_tag;
525     gni_return_t        status;
526     uint64_t            inst_id;
527     gni_cq_entry_t      event_data;
528     int                 msg_nbytes;
529     void                *msg_data;
530     gni_mem_handle_t    msg_mem_hndl;
531     CONTROL_MSG         *request_msg;
532     gni_post_descriptor_t *pd;
533
534     data_tag        = DATA_TAG;
535     control_tag     = LMSG_INIT_TAG;
536     ack_tag         = ACK_TAG;
537     
538     while (1) {
539     status = GNI_CqGetEvent(rx_cqh, &event_data);
540     if(status == GNI_RC_SUCCESS)
541     {
542         inst_id = GNI_CQ_GET_INST_ID(event_data);
543     }else if (status == GNI_RC_NOT_DONE)
544     {
545         return;
546     }else
547     {
548         GNI_RC_CHECK("CQ Get event", status);
549     }
550   
551     msg_tag = GNI_SMSG_ANY_TAG;
552     status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
553
554     //CmiPrintf("++++## PumpNetwork Small msg is received on PE:%d message tag=%c\n", myrank, msg_tag);
555     if(status  == GNI_RC_SUCCESS)
556     {
557         /* copy msg out and then put into queue */
558         if(msg_tag == data_tag)
559         {
560             //memcpy(&msg_nbytes, header, sizeof(int));
561             msg_nbytes = *(int*)header;
562             msg_data    = CmiAlloc(msg_nbytes);
563             //CmiPrintf("[%d] PumpNetworkMsgs: get datamsg, size: %d msg id:%d\n", myrank, msg_nbytes, GNI_CQ_GET_MSG_ID(event_data));
564             memcpy(msg_data, (char*)header+sizeof(int), msg_nbytes);
565             handleOneRecvedMsg(msg_nbytes, msg_data);
566         }
567         else if(msg_tag == control_tag) 
568         {
569             /* initial a get to transfer data from the sender side */
570             request_msg = (CONTROL_MSG *) header;
571             msg_data = LrtsAllocRegister(request_msg->length, &msg_mem_hndl); //need align checking
572             //pd = (gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t));
573             MallocPostDesc(pd);
574             //bzero(&pd, sizeof(pd));
575             if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
576                 pd->type            = GNI_POST_FMA_GET;
577             else
578                 pd->type            = GNI_POST_RDMA_GET;
579
580             pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
581             pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
582             pd->length          = request_msg->length;
583             pd->local_addr      = (uint64_t) msg_data;
584             pd->local_mem_hndl  = msg_mem_hndl; 
585             pd->remote_addr     = request_msg->source_addr;
586             pd->remote_mem_hndl = request_msg->source_mem_hndl;
587             pd->src_cq_hndl     = 0;     /* tx_cqh;  */
588             pd->rdma_mode       = 0;
589
590            // CmiPrintf("source=%d, addr=%p, handler=%ld,%ld \n", request_msg->source, (void*)pd.remote_addr, (request_msg->source_mem_hndl).qword1, (request_msg->source_mem_hndl).qword2);
591             if(pd->type == GNI_POST_RDMA_GET) 
592                 status = GNI_PostRdma(ep_hndl_array[request_msg->source], pd);
593             else
594                 status = GNI_PostFma(ep_hndl_array[request_msg->source],  pd);
595             GNI_RC_CHECK("post ", status);
596            
597             //CmiPrintf("post status=%s on PE:%d\n", gni_err_str[status], myrank);
598             if(status = GNI_RC_SUCCESS)
599             {
600                 /* put into receive buffer queue */
601             }else
602             {
603             }
604         }
605         else if(msg_tag == ack_tag) {
606             /* Get is done, release message . Now put is not used yet*/
607             request_msg = (CONTROL_MSG *) header;
608             //CmiPrintf("++++## ACK msg is received on PE:%d message size=%d, addr=%p\n", myrank, request_msg->length, (void*)request_msg->source_addr);
609
610             GNI_MemDeregister(nic_hndl, &request_msg->source_mem_hndl);
611             CmiFree((void*)request_msg->source_addr);
612         }else{
613             CmiPrintf("weird tag problem\n");
614             CmiAbort("Unknown tag\n");
615         }
616         GNI_SmsgRelease(ep_hndl_array[inst_id]);
617     }  //else not match smsg, maybe larger message
618     }   // end of while loop
619 }
620
621 /* Check whether message send or get is confirmed by remote */
622 static void PumpLocalTransactions()
623 {
624     gni_cq_entry_t ev;
625     gni_return_t status;
626     uint64_t type, inst_id, data_addr;
627     uint8_t         ack_tag = ACK_TAG;
628     gni_post_descriptor_t *tmp_pd;
629     MSG_LIST *msg_tmp;
630     //gni_post_descriptor_t   ack_pd;
631     MSG_LIST  *ptr;
632     CONTROL_MSG *ack_msg_tmp;
633
634     while (1) 
635     {
636
637         status = GNI_CqGetEvent(tx_cqh, &ev);
638         if(status == GNI_RC_SUCCESS)
639         {
640             type        = GNI_CQ_GET_TYPE(ev);
641             inst_id     = GNI_CQ_GET_INST_ID(ev);
642             data_addr   = GNI_CQ_GET_DATA(ev);
643         }else if (status == GNI_RC_NOT_DONE)
644         {
645             return;
646         }else
647         {
648             GNI_RC_CHECK("CQ Get event", status);
649         }
650
651         if(type == GNI_CQ_EVENT_TYPE_POST)
652         {
653             status = GNI_GetCompleted(tx_cqh, ev, &tmp_pd);
654             GNI_RC_CHECK("Local CQ completed ", status);
655             //Message is sent, free message , put is not used now
656             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
657             {
658                 CmiFree((void *)tmp_pd->local_addr);
659             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
660             {
661                 /* Send an ACK to remote side */
662                 //CmiPrintf("\nPE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
663                 ////CmiPrintf("\n+PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
664                 MallocControlMsg(ack_msg_tmp);
665                 ack_msg_tmp->source = myrank;
666                 //CmiPrintf("\n++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, SIZEFIELD((void*)tmp_pd->local_addr), tmp_pd->length, tmp_pd->remote_addr); 
667                 ////CmiPrintf("\n+++PE:%d Received large message by get , sizefield=%d, length=%d, addr=%p\n", myrank, remote_length , tmp_pd->length, (void*)remote_addr); 
668                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
669                 ack_msg_tmp->length=tmp_pd->length; 
670                 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
671                 //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
672            
673                 if(buffered_smsg_head!=0)
674                 {
675                     //CmiPrintf("[%d] PumpLocalTransactions: smsg buffered.\n", myrank);
676                     MallocMsgList(msg_tmp);
677                     msg_tmp->msg = ack_msg_tmp;
678                     msg_tmp ->size = sizeof(CONTROL_MSG);
679                     msg_tmp->tag = ack_tag;
680                     msg_tmp->destNode = inst_id;
681                     msg_tmp->next = 0;
682                     buffered_smsg_tail->next = msg_tmp;
683                     buffered_smsg_tail = msg_tmp;
684                 }else
685                 {
686                     //CmiPrintf("PE:%d sending ACK back addr=%p \n", myrank, ack_msg_tmp->source_addr); 
687                     status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, ack_tag);
688                     if(status == GNI_RC_SUCCESS)
689                     {
690                         FreeControlMsg(ack_msg_tmp);
691                     }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
692                     {
693                         // CmiPrintf("[%d] PumpLocalTransactions: ack smsg buffered.\n", myrank);
694                         MallocMsgList(msg_tmp);
695                         msg_tmp->msg = ack_msg_tmp;
696                         msg_tmp ->size = sizeof(CONTROL_MSG);
697                         msg_tmp->tag = ack_tag;
698                         msg_tmp->next = 0;
699                         msg_tmp->destNode = inst_id;
700                         buffered_smsg_head = msg_tmp;
701                         buffered_smsg_tail = msg_tmp;
702                     }
703                     else
704                         GNI_RC_CHECK("GNI_SmsgSendWTag", status);
705                 }
706                 handleOneRecvedMsg(SIZEFIELD((void*)tmp_pd->local_addr), (void*)tmp_pd->local_addr); 
707             }
708         }
709     }   /* end of while loop */
710 }
711
712 static int SendBufferMsg()
713 {
714     MSG_LIST        *ptr;
715     uint8_t         tag_data, tag_control, tag_ack;
716     gni_return_t     status;
717
718     tag_data    = DATA_TAG;
719     tag_control = LMSG_INIT_TAG;
720     tag_ack     = ACK_TAG;
721     /* can add flow control here to control the number of messages sent before handle message */
722     while(buffered_smsg_head != 0)
723     {
724         if(useStaticSMSG)
725         {
726             ptr = buffered_smsg_head;
727             if(ptr->tag == tag_data)
728             {
729                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], &(ptr->size), (uint32_t)sizeof(int), ptr->msg, ptr->size, 0, tag_data);
730                 if(status == GNI_RC_SUCCESS)
731                     CmiFree(ptr->msg);
732             }else if(ptr->tag ==tag_control)
733             {
734                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_control);
735                 if(status == GNI_RC_SUCCESS)
736                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
737             }else if (ptr->tag == tag_ack)
738             {
739                 status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, tag_ack);
740                 if(status == GNI_RC_SUCCESS)
741                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
742             }
743         } else if(useStaticMSGQ)
744         {
745             CmiPrintf("MSGQ Send not done\n");
746         }else
747         {
748             CmiPrintf("FMA Send not done\n");
749         }
750         if(status == GNI_RC_SUCCESS)
751         {
752             buffered_smsg_head= buffered_smsg_head->next;
753             FreeMsgList(ptr);
754         }else
755             break;
756     }
757     return 1;
758 }
759
760 static void LrtsAdvanceCommunication()
761 {
762     /*  Receive Msg first */
763     //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
764     PumpNetworkMsgs();
765     /* Release Sent Msg */
766     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
767     PumpLocalTransactions();
768     //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
769     /* Send buffered Message */
770     SendBufferMsg();
771 }
772
773 void remoteEventHandle(gni_cq_entry_t *event_data, void *context)
774 {
775     gni_return_t status, source_data, source_control;
776     uint64_t            source;
777     void                *header;
778     uint8_t             tag_data;
779     uint8_t             tag_control;
780
781     tag_data = DATA_TAG;
782     tag_control = LMSG_INIT_TAG;
783     /* pool the CQ to check which smsg endpoint to get the data */
784     //status = GNI_CqGetEvent(remote_cq_hndl, &event_data);
785         
786     /* check whether it is data or control information */
787     source = GNI_CQ_GET_SOURCE(*event_data);
788
789     if((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_data)) == GNI_RC_SUCCESS)
790     {
791         /* copy msg out and then put into queue */
792
793     } else if ((status = GNI_SmsgGetNextWTag(ep_hndl_array[source], &header, &tag_control)) == GNI_RC_SUCCESS)
794     {
795         /* initial a get to transfer data from the sender side */
796     } else
797     {
798
799     }
800
801 }
802
803 /* if FMA is used to transfer small messages, static allocation*/
804 static void _init_smallMsgwithFma(int size)
805 {
806     char            *fma_buffer;
807     gni_mem_handle_t      fma_buffer_mdh_addr;
808     mdh_addr_t            my_mdh_addr;
809     gni_return_t status;
810     
811     fma_buffer = (char *)calloc(FMA_PER_CORE*size, 1);
812     CmiAssert(fma_buffer != NULL);
813
814     status = GNI_MemRegister(nic_hndl, (uint64_t)fma_buffer,
815         FMA_PER_CORE*size, rx_cqh, 
816         GNI_MEM_READWRITE | GNI_MEM_USE_GART, -1,
817         &fma_buffer_mdh_addr);
818
819     GNI_RC_CHECK("Memregister DMA ", status);
820     /* Gather up all of the mdh's over the socket network, 
821      * * this also serves as a barrier */
822     fma_buffer_mdh_addr_base = (mdh_addr_t*)malloc(size* sizeof(mdh_addr_t));
823     CmiAssert(fma_buffer_mdh_addr_base);
824
825     my_mdh_addr.addr = (uint64_t)fma_buffer;
826     my_mdh_addr.mdh = fma_buffer_mdh_addr;
827     allgather(&my_mdh_addr, fma_buffer_mdh_addr_base, sizeof(mdh_addr_t));      
828
829 }
830
831 static void _init_static_smsg()
832 {
833     gni_smsg_attr_t      *smsg_attr;
834     gni_smsg_attr_t      *smsg_attr_vec;
835     unsigned int         smsg_memlen;
836     gni_mem_handle_t     my_smsg_mdh_mailbox;
837     register    int      i;
838     gni_return_t status;
839     uint32_t              vmdh_index = -1;
840
841      smsg_attr = (gni_smsg_attr_t *)malloc(mysize*sizeof(gni_smsg_attr_t));
842     _MEMCHECK(smsg_attr);
843
844     smsg_attr_ptr = malloc(sizeof(void*) *mysize);
845     for(i=0; i<mysize; i++)
846     {
847         if(i==myrank)
848             continue;
849         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
850         smsg_attr[i].mbox_offset = 0;
851         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
852         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
853         status = GNI_SmsgBufferSizeNeeded(&smsg_attr[i], &smsg_memlen);
854         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
855
856         smsg_attr_ptr[i] = memalign(64, smsg_memlen);
857         _MEMCHECK(smsg_attr_ptr[i]);
858         bzero(smsg_attr_ptr[i], smsg_memlen);
859         status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_attr_ptr[i],
860             smsg_memlen, rx_cqh,
861             GNI_MEM_READWRITE | GNI_MEM_USE_GART | GNI_MEM_PI_FLUSH,   
862             vmdh_index,
863             &my_smsg_mdh_mailbox);
864
865         GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
866       
867         smsg_attr[i].msg_buffer = smsg_attr_ptr[i];
868         smsg_attr[i].buff_size = smsg_memlen;
869         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
870     }
871     smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
872     CmiAssert(smsg_attr_vec);
873    
874     allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
875     //MPI_Alltoall(smsg_attr, sizeof(gni_smsg_attr_t), MPI_BYTE, smsg_attr_vec, sizeof(gni_smsg_attr_t), MPI_BYTE, MPI_COMM_WORLD);
876     for(i=0; i<mysize; i++)
877     {
878         if (myrank == i) continue;
879         /* initialize the smsg channel */
880         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
881         GNI_RC_CHECK("SMSG Init", status);
882     } //end initialization
883     free(smsg_attr);
884     free(smsg_attr_vec);
885
886
887 static void _init_static_msgq()
888 {
889     gni_return_t status;
890     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
891     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
892     msgq_attrs.smsg_q_sz = 1;
893     msgq_attrs.rcv_pool_sz = 1;
894     msgq_attrs.num_msgq_eps = 2;
895     msgq_attrs.nloc_insts = 8;
896     msgq_attrs.modes = 0;
897     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
898
899     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
900     GNI_RC_CHECK("MSGQ Init", status);
901
902
903 }
904 static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
905 {
906     register int            i;
907     int                     rc;
908     int                     device_id = 0;
909     unsigned int            remote_addr;
910     gni_cdm_handle_t        cdm_hndl;
911     gni_return_t            status = GNI_RC_SUCCESS;
912     uint32_t                vmdh_index = -1;
913     uint8_t                 ptag;
914     unsigned int            local_addr, *MPID_UGNI_AllAddr;
915     int                     first_spawned;
916
917     void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
918     void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
919     void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
920     
921     //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
922     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
923     
924     status = PMI_Init(&first_spawned);
925     GNI_RC_CHECK("PMI_Init", status);
926
927     status = PMI_Get_size(&mysize);
928     GNI_RC_CHECK("PMI_Getsize", status);
929
930     status = PMI_Get_rank(&myrank);
931     GNI_RC_CHECK("PMI_getrank", status);
932
933     *myNodeID = myrank;
934     *numNodes = mysize;
935   
936     if(myrank == 0)
937     {
938         printf("Charm++> Running on Gemini (GNI)\n");
939     }
940 #ifdef USE_ONESIDED
941     onesided_init(NULL, &onesided_hnd);
942
943     // this is a GNI test, so use the libonesided bypass functionality
944     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
945     local_addr = gniGetNicAddress();
946 #else
947     ptag = get_ptag();
948     cookie = get_cookie();
949     
950     //Create and attach to the communication  domain */
951     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
952     GNI_RC_CHECK("GNI_CdmCreate", status);
953     //* device id The device id is the minor number for the device
954     //that is assigned to the device by the system when the device is created.
955     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
956     //where X is the device number 0 default 
957     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
958     GNI_RC_CHECK("GNI_CdmAttach", status);
959     local_addr = get_gni_nic_address(0);
960 #endif
961     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
962     _MEMCHECK(MPID_UGNI_AllAddr);
963     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
964     /* create the local completion queue */
965     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
966     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &tx_cqh);
967     //status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, &local_event_handler, NULL, &tx_cqh);
968     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
969     
970     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
971
972     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rx_cqh);
973     GNI_RC_CHECK("Create CQ (rx)", status);
974     
975     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &remote_bte_cq_hndl);
976     //GNI_RC_CHECK("Create BTE CQ", status);
977
978     /* create the endpoints. they need to be bound to allow later CQWrites to them */
979     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
980     _MEMCHECK(ep_hndl_array);
981
982     for (i=0; i<mysize; i++) {
983         if(i == myrank) continue;
984         status = GNI_EpCreate(nic_hndl, tx_cqh, &ep_hndl_array[i]);
985         GNI_RC_CHECK("GNI_EpCreate ", status);   
986         remote_addr = MPID_UGNI_AllAddr[i];
987         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
988         GNI_RC_CHECK("GNI_EpBind ", status);   
989     }
990     /* Depending on the number of cores in the job, decide different method */
991     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
992     if(useStaticSMSG == 1)
993     {
994         _init_static_smsg(mysize);
995     }else if(useStaticMSGQ == 1)
996     {
997         _init_static_msgq();
998     }else if( useStaticFMA == 1)
999     {
1000         _init_smallMsgwithFma(mysize);
1001     }
1002     free(MPID_UGNI_AllAddr);
1003     //PRINT_INFO("\nDone with LrtsInit")
1004 }
1005
1006 #define ALIGNBUF                64
1007
1008 void* LrtsAlloc(int n_bytes, int header)
1009 {
1010     if(n_bytes <= SMSG_MAX_MSG)
1011     {
1012         int totalsize = n_bytes+header;
1013         return malloc(totalsize);
1014 /*
1015     }else if(n_bytes <= LRTS_GNI_RDMA_THRESHOLD)
1016     {
1017         return malloc(n_bytes);
1018 */
1019     }else 
1020     {
1021         CmiAssert(header <= ALIGNBUF);
1022         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
1023         return res + ALIGNBUF - header;
1024     }
1025 }
1026
1027 void  LrtsFree(void *msg)
1028 {
1029     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
1030     if (size <= SMSG_MAX_MSG)
1031       free(msg);
1032     else
1033       free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
1034 }
1035
1036 static void* LrtsAllocRegister(int n_bytes, gni_mem_handle_t* mem_hndl)
1037 {
1038     void *ptr;
1039     gni_return_t status;
1040     ptr = CmiAlloc(n_bytes);
1041     _MEMCHECK(ptr);
1042     status = GNI_MemRegister(nic_hndl, (uint64_t)ptr,
1043         n_bytes, rx_cqh, 
1044         GNI_MEM_READWRITE | GNI_MEM_USE_GART, -1,
1045         mem_hndl);
1046
1047     GNI_RC_CHECK("GNI_MemRegister in LrtsAlloc", status);
1048     return ptr;
1049
1050 }
1051
1052 static void LrtsExit()
1053 {
1054     /* free memory ? */
1055     PMI_Finalize();
1056     exit(0);
1057 }
1058
1059 static void LrtsDrainResources()
1060 {
1061     while (!SendBufferMsg()) {
1062       PumpNetworkMsgs();
1063       PumpLocalTransactions();
1064     }
1065 }
1066
1067 void CmiAbort(const char *message) {
1068
1069     CmiPrintf("CmiAbort is calling on PE:%d\n", myrank);
1070     PMI_Abort(-1, message);
1071 }
1072
1073 #if 0
1074 /**************************  TIMER FUNCTIONS **************************/
1075 #if CMK_TIMER_USE_SPECIAL
1076 /* MPI calls are not threadsafe, even the timer on some machines */
1077 static CmiNodeLock  timerLock = 0;
1078 static int _absoluteTime = 0;
1079 static double starttimer = 0;
1080 static int _is_global = 0;
1081
1082 int CmiTimerIsSynchronized() {
1083     int  flag;
1084     void *v;
1085
1086     /*  check if it using synchronized timer */
1087     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1088         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1089     if (flag) {
1090         _is_global = *(int*)v;
1091         if (_is_global && CmiMyPe() == 0)
1092             printf("Charm++> MPI timer is synchronized\n");
1093     }
1094     return _is_global;
1095 }
1096
1097 int CmiTimerAbsolute() {
1098     return _absoluteTime;
1099 }
1100
1101 double CmiStartTimer() {
1102     return 0.0;
1103 }
1104
1105 double CmiInitTime() {
1106     return starttimer;
1107 }
1108
1109 void CmiTimerInit(char **argv) {
1110     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1111     if (_absoluteTime && CmiMyPe() == 0)
1112         printf("Charm++> absolute MPI timer is used\n");
1113     _is_global = CmiTimerIsSynchronized();
1114
1115     if (_is_global) {
1116         if (CmiMyRank() == 0) {
1117             double minTimer;
1118 #if CMK_TIMER_USE_XT3_DCLOCK
1119             starttimer = dclock();
1120 #else
1121             starttimer = MPI_Wtime();
1122 #endif
1123
1124             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1125                 MPI_COMM_WORLD );
1126             starttimer = minTimer;
1127         }
1128     } else { /* we don't have a synchronous timer, set our own start time */
1129         CmiBarrier();
1130         CmiBarrier();
1131         CmiBarrier();
1132 #if CMK_TIMER_USE_XT3_DCLOCK
1133         starttimer = dclock();
1134 #else
1135         starttimer = MPI_Wtime();
1136 #endif
1137     }
1138
1139     CmiNodeAllBarrier();          /* for smp */
1140 }
1141
1142 /**
1143  * Since the timerLock is never created, and is
1144  * always NULL, then all the if-condition inside
1145  * the timer functions could be disabled right
1146  * now in the case of SMP.
1147  */
1148 double CmiTimer(void) {
1149     double t;
1150 #if CMK_TIMER_USE_XT3_DCLOCK
1151     t = dclock();
1152 #else
1153     t = MPI_Wtime();
1154 #endif
1155     return _absoluteTime?t: (t-starttimer);
1156 }
1157
1158 double CmiWallTimer(void) {
1159     double t;
1160 #if CMK_TIMER_USE_XT3_DCLOCK
1161     t = dclock();
1162 #else
1163     t = MPI_Wtime();
1164 #endif
1165     return _absoluteTime? t: (t-starttimer);
1166 }
1167
1168 double CmiCpuTimer(void) {
1169     double t;
1170 #if CMK_TIMER_USE_XT3_DCLOCK
1171     t = dclock() - starttimer;
1172 #else
1173     t = MPI_Wtime() - starttimer;
1174 #endif
1175     return t;
1176 }
1177
1178 #endif
1179 #endif
1180 /************Barrier Related Functions****************/
1181
1182 int CmiBarrier()
1183 {
1184     int status;
1185     status = PMI_Barrier();
1186     return status;
1187
1188 }