2e47be3243107bc0632084575484bffc936294d9
[charm.git] / src / arch / origin-mpi / machine.c
1 #include <stdio.h>
2 #include <sys/time.h>
3 #include "converse.h"
4 #include <mpi.h>
5
6 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
7 #define MAX_QLEN 200
8
9 int Cmi_mype;
10 int Cmi_numpes;
11 int Cmi_myrank;
12 CpvDeclare(void*, CmiLocalQueue);
13
14 #define BLK_LEN  512
15
16 static int MsgQueueLen=0;
17 static int request_max;
18 static void **recdQueue_blk;
19 static unsigned int recdQueue_blk_len;
20 static unsigned int recdQueue_first;
21 static unsigned int recdQueue_len;
22 static void recdQueueInit(void);
23 static void recdQueueAddToBack(void *element);
24 static void *recdQueueRemoveFromFront(void);
25
26 typedef struct msg_list {
27      MPI_Request req;
28      char *msg;
29      struct msg_list *next;
30 } SMSG_LIST;
31
32 static int Cmi_dim;
33 static double itime;
34
35 static SMSG_LIST *sent_msgs=0;
36 static SMSG_LIST *end_sent=0;
37
38 #if NODE_0_IS_CONVHOST
39 int inside_comm = 0;
40 #endif
41
42 double starttimer;
43
44 void CmiAbort(const char *message);
45
46 /**************************  TIMER FUNCTIONS **************************/
47
48 void CmiTimerInit(void)
49 {
50   starttimer = MPI_Wtime();
51 }
52
53 double CmiTimer(void)
54 {
55   return MPI_Wtime() - starttimer;
56 }
57
58 double CmiWallTimer(void)
59 {
60   return MPI_Wtime() - starttimer;
61 }
62
63 double CmiCpuTimer(void)
64 {
65   return MPI_Wtime() - starttimer;
66 }
67
68 static int CmiAllAsyncMsgsSent(void)
69 {
70    SMSG_LIST *msg_tmp = sent_msgs;
71    MPI_Status sts;
72    int done;
73      
74    while(msg_tmp!=0) {
75     done = 0;
76     MPI_Test(&(msg_tmp->req), &done, &sts);
77     if(!done)
78       return 0;
79     msg_tmp = msg_tmp->next;
80     MsgQueueLen--;
81    }
82    return 1;
83 }
84
85 int CmiAsyncMsgSent(CmiCommHandle c) {
86      
87   SMSG_LIST *msg_tmp = sent_msgs;
88   int done;
89   MPI_Status sts;
90
91   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
92     msg_tmp = msg_tmp->next;
93   if(msg_tmp) {
94     done = 0;
95     MPI_Test(&(msg_tmp->req), &done, &sts);
96     return ((done)?1:0);
97   } else {
98     return 1;
99   }
100 }
101
102 void CmiReleaseCommHandle(CmiCommHandle c)
103 {
104   return;
105 }
106
107
108 static void CmiReleaseSentMessages(void)
109 {
110   SMSG_LIST *msg_tmp=sent_msgs;
111   SMSG_LIST *prev=0;
112   SMSG_LIST *temp;
113   int done;
114   MPI_Status sts;
115      
116   while(msg_tmp!=0) {
117     done =0;
118     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
119       CmiAbort("MPI_Test failed\n");
120     if(done) {
121       MsgQueueLen--;
122       /* Release the message */
123       temp = msg_tmp->next;
124       if(prev==0)  /* first message */
125         sent_msgs = temp;
126       else
127         prev->next = temp;
128       CmiFree(msg_tmp->msg);
129       CmiFree(msg_tmp);
130       msg_tmp = temp;
131     } else {
132       prev = msg_tmp;
133       msg_tmp = msg_tmp->next;
134     }
135   }
136   end_sent = prev;
137 }
138
139 static int PumpMsgs(void)
140 {
141   int nbytes, flg, res;
142   char *msg;
143   MPI_Status sts;
144   int recd=0;
145
146   while(1) {
147     flg = 0;
148     res = MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flg, &sts);
149     if(res != MPI_SUCCESS)
150       CmiAbort("MPI_Iprobe failed\n");
151     if(!flg)
152       return recd;
153     recd = 1;
154     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
155     msg = (char *) CmiAlloc(nbytes);
156     MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,1,MPI_COMM_WORLD,&sts);
157     recdQueueAddToBack(msg);
158   }
159 }
160
161 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
162
163 void *CmiGetNonLocal(void)
164 {
165   void *msg = recdQueueRemoveFromFront();
166   if(!msg) {
167     CmiReleaseSentMessages();
168     if (PumpMsgs())
169       return recdQueueRemoveFromFront();
170     else
171       return 0;
172   }
173   return msg;
174 }
175
176 void CmiNotifyIdle(void)
177 {
178   CmiReleaseSentMessages();
179   PumpMsgs();
180 }
181  
182 /********************* MESSAGE SEND FUNCTIONS ******************/
183
184 void CmiSyncSendFn(int destPE, int size, char *msg)
185 {
186   char *dupmsg = (char *) CmiAlloc(size);
187   memcpy(dupmsg, msg, size);
188   if (Cmi_mype==destPE)
189     FIFO_EnQueue(CpvAccess(CmiLocalQueue),dupmsg);
190   else
191     CmiAsyncSendFn(destPE, size, dupmsg);
192 }
193
194
195 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
196 {
197   SMSG_LIST *msg_tmp;
198   int res;
199      
200   if(destPE == CmiMyPe()) {
201     char *dupmsg = (char *) CmiAlloc(size);
202     memcpy(dupmsg, msg, size);
203     FIFO_EnQueue(CpvAccess(CmiLocalQueue),dupmsg);
204     return 0;
205   }
206   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
207   msg_tmp->msg = msg;
208   msg_tmp->next = 0;
209   while (MsgQueueLen > request_max) {
210         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
211         CmiReleaseSentMessages();
212   }
213   res = MPI_Isend((void *)msg,size,MPI_BYTE,destPE,1,MPI_COMM_WORLD,&(msg_tmp->req));
214   MsgQueueLen++;
215   if(sent_msgs==0)
216     sent_msgs = msg_tmp;
217   else
218     end_sent->next = msg_tmp;
219   end_sent = msg_tmp;
220   return (CmiCommHandle) &(msg_tmp->req);
221 }
222
223 void CmiFreeSendFn(int destPE, int size, char *msg)
224 {
225   if (Cmi_mype==destPE) {
226     FIFO_EnQueue(CpvAccess(CmiLocalQueue),msg);
227   } else {
228     CmiAsyncSendFn(destPE, size, msg);
229   }
230 }
231
232
233 /*********************** BROADCAST FUNCTIONS **********************/
234
235 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
236 {
237   int i ;
238      
239   for ( i=Cmi_mype+1; i<Cmi_numpes; i++ ) 
240     CmiSyncSendFn(i, size,msg) ;
241   for ( i=0; i<Cmi_mype; i++ ) 
242     CmiSyncSendFn(i, size,msg) ;
243 }
244
245
246 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)  
247 {
248   int i ;
249
250   for ( i=Cmi_mype+1; i<Cmi_numpes; i++ ) 
251     CmiAsyncSendFn(i,size,msg) ;
252   for ( i=0; i<Cmi_mype; i++ ) 
253     CmiAsyncSendFn(i,size,msg) ;
254   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
255 }
256
257 void CmiFreeBroadcastFn(int size, char *msg)
258 {
259    CmiSyncBroadcastFn(size,msg);
260    CmiFree(msg);
261 }
262  
263 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
264 {
265   int i ;
266      
267   for ( i=0; i<Cmi_numpes; i++ ) 
268     CmiSyncSendFn(i,size,msg) ;
269 }
270
271 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)  
272 {
273   int i ;
274
275   for ( i=1; i<Cmi_numpes; i++ ) 
276     CmiAsyncSendFn(i,size,msg) ;
277   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
278 }
279
280 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
281 {
282   int i ;
283      
284   for ( i=0; i<Cmi_numpes; i++ ) 
285     CmiSyncSendFn(i,size,msg) ;
286   CmiFree(msg) ;
287 }
288
289 /* Neighbour functions used mainly in LDB : pretend the SP1 is a hypercube */
290
291 int CmiNumNeighbours(int node)
292 {
293   return Cmi_dim;
294 }
295
296 void CmiGetNodeNeighbours(int node, int *neighbours)
297 {
298   int i;
299      
300   for (i = 0; i < Cmi_dim; i++)
301     neighbours[i] = FLIPBIT(node,i);
302 }
303
304 int CmiNeighboursIndex(int node, int neighbour)
305 {
306   int index = 0;
307   int linenum = node ^ neighbour;
308
309   while (linenum > 1) {
310     linenum = linenum >> 1;
311     index++;
312   }
313   return index;
314 }
315
316 /************************** MAIN ***********************************/
317 #define MPI_REQUEST_MAX=1024*10 
318
319 void ConverseExit(void)
320 {
321   ConverseCommonExit();
322   MPI_Finalize();
323 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
324   if (CmiMyPe() == 0){
325     CmiPrintf("End of program\n");
326   }
327 #endif
328   exit(0);
329 }
330
331 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
332 {
333   int n,i ;
334   int nbuf[4];
335   
336   Cmi_myrank = 0;
337   MPI_Init(&argc, &argv);
338   MPI_Comm_size(MPI_COMM_WORLD, &Cmi_numpes);
339   MPI_Comm_rank(MPI_COMM_WORLD, &Cmi_mype);
340   /* find dim = log2(numpes), to pretend we are a hypercube */
341   for ( Cmi_dim=0,n=Cmi_numpes; n>1; n/=2 )
342     Cmi_dim++ ;
343  /* CmiSpanTreeInit();*/
344   i=0;
345   request_max=MAX_QLEN;
346   while (argv[i] != 0) {
347     if (strncmp(argv[i], "+requestmax",11) == 0) {
348       if (strlen(argv[i]) > 11)
349         sscanf(argv[i], "+p%d", &request_max);
350       else
351         sscanf(argv[i+1], "%d", &request_max);
352     } 
353     i++;
354   }
355   /*printf("request max=%d\n", request_max);*/
356   CmiTimerInit();
357   CpvInitialize(void *, CmiLocalQueue);
358   CpvAccess(CmiLocalQueue) = (void *)FIFO_Create();
359   recdQueueInit();
360   CthInit(argv);
361   ConverseCommonInit(argv);
362   if (initret==0) {
363     fn(argc, argv);
364     if (usched==0) CsdScheduler(-1);
365     ConverseExit();
366   }
367 }
368
369 /***********************************************************************
370  *
371  * Abort function:
372  *
373  ************************************************************************/
374
375 void CmiAbort(const char *message)
376 {
377   CmiError(message);
378   MPI_Abort(MPI_COMM_WORLD, 1);
379 }
380  
381 /* ****************************************************************** */
382 /*    The following internal functions implement recd msg queue       */
383 /* ****************************************************************** */
384
385 static void ** AllocBlock(unsigned int len)
386 {
387   void ** blk;
388
389   blk=(void **)CmiAlloc(len*sizeof(void *));
390   if(blk==(void **)0) {
391     CmiError("Cannot Allocate Memory!\n");
392     MPI_Abort(MPI_COMM_WORLD, 1);
393   }
394   return blk;
395 }
396
397 static void 
398 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
399 {
400   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
401   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
402 }
403
404 void recdQueueInit(void)
405 {
406   recdQueue_blk = AllocBlock(BLK_LEN);
407   recdQueue_blk_len = BLK_LEN;
408   recdQueue_first = 0;
409   recdQueue_len = 0;
410 }
411
412 void recdQueueAddToBack(void *element)
413 {
414   inside_comm = 1;
415   if(recdQueue_len==recdQueue_blk_len) {
416     void **blk;
417     recdQueue_blk_len *= 3;
418     blk = AllocBlock(recdQueue_blk_len);
419     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
420     CmiFree(recdQueue_blk);
421     recdQueue_blk = blk;
422     recdQueue_first = 0;
423   }
424   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
425   inside_comm = 0;
426 }
427
428
429 void * recdQueueRemoveFromFront(void)
430 {
431   if(recdQueue_len) {
432     void *element;
433     element = recdQueue_blk[recdQueue_first++];
434     recdQueue_first %= recdQueue_blk_len;
435     recdQueue_len--;
436     return element;
437   }
438   return 0;
439 }
440