Removed almost all warnings on origin2000.
[charm.git] / src / arch / origin-mpi / machine.c
1 #include <stdio.h>
2 #include <sys/time.h>
3 #include "converse.h"
4 #include <mpi.h>
5
6 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
7 #define MAX_QLEN 200
8
9 int Cmi_mype;
10 int Cmi_numpes;
11 int Cmi_myrank;
12 CpvDeclare(void*, CmiLocalQueue);
13
14 #define BLK_LEN  512
15
16 static int MsgQueueLen=0;
17 static int request_max;
18 static void **recdQueue_blk;
19 static unsigned int recdQueue_blk_len;
20 static unsigned int recdQueue_first;
21 static unsigned int recdQueue_len;
22 static void recdQueueInit(void);
23 static void recdQueueAddToBack(void *element);
24 static void *recdQueueRemoveFromFront(void);
25
26 typedef struct msg_list {
27      MPI_Request req;
28      char *msg;
29      struct msg_list *next;
30 } SMSG_LIST;
31
32 static int Cmi_dim;
33
34 static SMSG_LIST *sent_msgs=0;
35 static SMSG_LIST *end_sent=0;
36
37 #if NODE_0_IS_CONVHOST
38 int inside_comm = 0;
39 #endif
40
41 double starttimer;
42
43 void CmiAbort(const char *message);
44
45 /**************************  TIMER FUNCTIONS **************************/
46
47 void CmiTimerInit(void)
48 {
49   starttimer = MPI_Wtime();
50 }
51
52 double CmiTimer(void)
53 {
54   return MPI_Wtime() - starttimer;
55 }
56
57 double CmiWallTimer(void)
58 {
59   return MPI_Wtime() - starttimer;
60 }
61
62 double CmiCpuTimer(void)
63 {
64   return MPI_Wtime() - starttimer;
65 }
66
67 static int CmiAllAsyncMsgsSent(void)
68 {
69    SMSG_LIST *msg_tmp = sent_msgs;
70    MPI_Status sts;
71    int done;
72      
73    while(msg_tmp!=0) {
74     done = 0;
75     MPI_Test(&(msg_tmp->req), &done, &sts);
76     if(!done)
77       return 0;
78     msg_tmp = msg_tmp->next;
79     MsgQueueLen--;
80    }
81    return 1;
82 }
83
84 int CmiAsyncMsgSent(CmiCommHandle c) {
85      
86   SMSG_LIST *msg_tmp = sent_msgs;
87   int done;
88   MPI_Status sts;
89
90   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
91     msg_tmp = msg_tmp->next;
92   if(msg_tmp) {
93     done = 0;
94     MPI_Test(&(msg_tmp->req), &done, &sts);
95     return ((done)?1:0);
96   } else {
97     return 1;
98   }
99 }
100
101 void CmiReleaseCommHandle(CmiCommHandle c)
102 {
103   return;
104 }
105
106
107 static void CmiReleaseSentMessages(void)
108 {
109   SMSG_LIST *msg_tmp=sent_msgs;
110   SMSG_LIST *prev=0;
111   SMSG_LIST *temp;
112   int done;
113   MPI_Status sts;
114      
115   while(msg_tmp!=0) {
116     done =0;
117     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
118       CmiAbort("MPI_Test failed\n");
119     if(done) {
120       MsgQueueLen--;
121       /* Release the message */
122       temp = msg_tmp->next;
123       if(prev==0)  /* first message */
124         sent_msgs = temp;
125       else
126         prev->next = temp;
127       CmiFree(msg_tmp->msg);
128       CmiFree(msg_tmp);
129       msg_tmp = temp;
130     } else {
131       prev = msg_tmp;
132       msg_tmp = msg_tmp->next;
133     }
134   }
135   end_sent = prev;
136 }
137
138 static int PumpMsgs(void)
139 {
140   int nbytes, flg, res;
141   char *msg;
142   MPI_Status sts;
143   int recd=0;
144
145   while(1) {
146     flg = 0;
147     res = MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flg, &sts);
148     if(res != MPI_SUCCESS)
149       CmiAbort("MPI_Iprobe failed\n");
150     if(!flg)
151       return recd;
152     recd = 1;
153     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
154     msg = (char *) CmiAlloc(nbytes);
155     MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,1,MPI_COMM_WORLD,&sts);
156     recdQueueAddToBack(msg);
157   }
158 }
159
160 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
161
162 void *CmiGetNonLocal(void)
163 {
164   void *msg = recdQueueRemoveFromFront();
165   if(!msg) {
166     CmiReleaseSentMessages();
167     if (PumpMsgs())
168       return recdQueueRemoveFromFront();
169     else
170       return 0;
171   }
172   return msg;
173 }
174
175 void CmiNotifyIdle(void)
176 {
177   CmiReleaseSentMessages();
178   PumpMsgs();
179 }
180  
181 /********************* MESSAGE SEND FUNCTIONS ******************/
182
183 void CmiSyncSendFn(int destPE, int size, char *msg)
184 {
185   char *dupmsg = (char *) CmiAlloc(size);
186   memcpy(dupmsg, msg, size);
187   if (Cmi_mype==destPE)
188     FIFO_EnQueue(CpvAccess(CmiLocalQueue),dupmsg);
189   else
190     CmiAsyncSendFn(destPE, size, dupmsg);
191 }
192
193
194 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
195 {
196   SMSG_LIST *msg_tmp;
197      
198   if(destPE == CmiMyPe()) {
199     char *dupmsg = (char *) CmiAlloc(size);
200     memcpy(dupmsg, msg, size);
201     FIFO_EnQueue(CpvAccess(CmiLocalQueue),dupmsg);
202     return 0;
203   }
204   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
205   msg_tmp->msg = msg;
206   msg_tmp->next = 0;
207   while (MsgQueueLen > request_max) {
208         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
209         CmiReleaseSentMessages();
210   }
211   MPI_Isend((void *)msg,size,MPI_BYTE,destPE,1,MPI_COMM_WORLD,&(msg_tmp->req));
212   MsgQueueLen++;
213   if(sent_msgs==0)
214     sent_msgs = msg_tmp;
215   else
216     end_sent->next = msg_tmp;
217   end_sent = msg_tmp;
218   return (CmiCommHandle) &(msg_tmp->req);
219 }
220
221 void CmiFreeSendFn(int destPE, int size, char *msg)
222 {
223   if (Cmi_mype==destPE) {
224     FIFO_EnQueue(CpvAccess(CmiLocalQueue),msg);
225   } else {
226     CmiAsyncSendFn(destPE, size, msg);
227   }
228 }
229
230
231 /*********************** BROADCAST FUNCTIONS **********************/
232
233 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
234 {
235   int i ;
236      
237   for ( i=Cmi_mype+1; i<Cmi_numpes; i++ ) 
238     CmiSyncSendFn(i, size,msg) ;
239   for ( i=0; i<Cmi_mype; i++ ) 
240     CmiSyncSendFn(i, size,msg) ;
241 }
242
243
244 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)  
245 {
246   int i ;
247
248   for ( i=Cmi_mype+1; i<Cmi_numpes; i++ ) 
249     CmiAsyncSendFn(i,size,msg) ;
250   for ( i=0; i<Cmi_mype; i++ ) 
251     CmiAsyncSendFn(i,size,msg) ;
252   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
253 }
254
255 void CmiFreeBroadcastFn(int size, char *msg)
256 {
257    CmiSyncBroadcastFn(size,msg);
258    CmiFree(msg);
259 }
260  
261 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
262 {
263   int i ;
264      
265   for ( i=0; i<Cmi_numpes; i++ ) 
266     CmiSyncSendFn(i,size,msg) ;
267 }
268
269 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)  
270 {
271   int i ;
272
273   for ( i=1; i<Cmi_numpes; i++ ) 
274     CmiAsyncSendFn(i,size,msg) ;
275   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
276 }
277
278 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
279 {
280   int i ;
281      
282   for ( i=0; i<Cmi_numpes; i++ ) 
283     CmiSyncSendFn(i,size,msg) ;
284   CmiFree(msg) ;
285 }
286
287 /* Neighbour functions used mainly in LDB : pretend the SP1 is a hypercube */
288
289 int CmiNumNeighbours(int node)
290 {
291   return Cmi_dim;
292 }
293
294 void CmiGetNodeNeighbours(int node, int *neighbours)
295 {
296   int i;
297      
298   for (i = 0; i < Cmi_dim; i++)
299     neighbours[i] = FLIPBIT(node,i);
300 }
301
302 int CmiNeighboursIndex(int node, int neighbour)
303 {
304   int index = 0;
305   int linenum = node ^ neighbour;
306
307   while (linenum > 1) {
308     linenum = linenum >> 1;
309     index++;
310   }
311   return index;
312 }
313
314 /************************** MAIN ***********************************/
315 #define MPI_REQUEST_MAX=1024*10 
316
317 void ConverseExit(void)
318 {
319   ConverseCommonExit();
320   MPI_Finalize();
321 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
322   if (CmiMyPe() == 0){
323     CmiPrintf("End of program\n");
324   }
325 #endif
326   exit(0);
327 }
328
329 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
330 {
331   int n,i ;
332   
333   Cmi_myrank = 0;
334   MPI_Init(&argc, &argv);
335   MPI_Comm_size(MPI_COMM_WORLD, &Cmi_numpes);
336   MPI_Comm_rank(MPI_COMM_WORLD, &Cmi_mype);
337   /* find dim = log2(numpes), to pretend we are a hypercube */
338   for ( Cmi_dim=0,n=Cmi_numpes; n>1; n/=2 )
339     Cmi_dim++ ;
340  /* CmiSpanTreeInit();*/
341   i=0;
342   request_max=MAX_QLEN;
343   while (argv[i] != 0) {
344     if (strncmp(argv[i], "+requestmax",11) == 0) {
345       if (strlen(argv[i]) > 11)
346         sscanf(argv[i], "+p%d", &request_max);
347       else
348         sscanf(argv[i+1], "%d", &request_max);
349     } 
350     i++;
351   }
352   /*printf("request max=%d\n", request_max);*/
353   CmiTimerInit();
354   CpvInitialize(void *, CmiLocalQueue);
355   CpvAccess(CmiLocalQueue) = (void *)FIFO_Create();
356   recdQueueInit();
357   CthInit(argv);
358   ConverseCommonInit(argv);
359   if (initret==0) {
360     fn(argc, argv);
361     if (usched==0) CsdScheduler(-1);
362     ConverseExit();
363   }
364 }
365
366 /***********************************************************************
367  *
368  * Abort function:
369  *
370  ************************************************************************/
371
372 void CmiAbort(const char *message)
373 {
374   CmiError(message);
375   MPI_Abort(MPI_COMM_WORLD, 1);
376 }
377  
378 /* ****************************************************************** */
379 /*    The following internal functions implement recd msg queue       */
380 /* ****************************************************************** */
381
382 static void ** AllocBlock(unsigned int len)
383 {
384   void ** blk;
385
386   blk=(void **)CmiAlloc(len*sizeof(void *));
387   if(blk==(void **)0) {
388     CmiError("Cannot Allocate Memory!\n");
389     MPI_Abort(MPI_COMM_WORLD, 1);
390   }
391   return blk;
392 }
393
394 static void 
395 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
396 {
397   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
398   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
399 }
400
401 void recdQueueInit(void)
402 {
403   recdQueue_blk = AllocBlock(BLK_LEN);
404   recdQueue_blk_len = BLK_LEN;
405   recdQueue_first = 0;
406   recdQueue_len = 0;
407 }
408
409 void recdQueueAddToBack(void *element)
410 {
411   inside_comm = 1;
412   if(recdQueue_len==recdQueue_blk_len) {
413     void **blk;
414     recdQueue_blk_len *= 3;
415     blk = AllocBlock(recdQueue_blk_len);
416     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
417     CmiFree(recdQueue_blk);
418     recdQueue_blk = blk;
419     recdQueue_first = 0;
420   }
421   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
422   inside_comm = 0;
423 }
424
425
426 void * recdQueueRemoveFromFront(void)
427 {
428   if(recdQueue_len) {
429     void *element;
430     element = recdQueue_blk[recdQueue_first++];
431     recdQueue_first %= recdQueue_blk_len;
432     recdQueue_len--;
433     return element;
434   }
435   return 0;
436 }
437