1f318a391589a271ffc5a4a5ce8521fa4ee63862
[charm.git] / src / arch / origin-pthreads / machine.c
1 #include <errno.h>
2 #include <pthread.h>
3 #include <sched.h>
4 #include <time.h>
5
6 #include <stdio.h>
7 #include <sys/types.h>
8 #include <sys/time.h>
9 #include <limits.h>
10 #include <unistd.h>
11
12 #include "converse.h"
13 #include "fifo.h"
14
15 #define BLK_LEN  512
16
17 typedef struct {
18   pthread_mutex_t mutex;
19   pthread_cond_t cond;
20   int waiting;
21   void     **blk;
22   unsigned int blk_len;
23   unsigned int first;
24   unsigned int len;
25   unsigned int maxlen;
26 } McQueue;
27
28 static McQueue *McQueueCreate(void);
29 static void McQueueAddToBack(McQueue *queue, void *element);
30 static void *McQueueRemoveFromFront(McQueue *queue);
31 static McQueue **MsgQueue;
32
33 CpvDeclare(void*, CmiLocalQueue);
34
35 int Cmi_argc;
36 int Cmi_numpes;
37 int Cmi_usched;
38 int Cmi_initret;
39 CmiStartFn Cmi_startFn;
40
41 pthread_key_t perThreadKey;
42
43 static void *threadInit(void *arg);
44
45 pthread_mutex_t memory_mutex;
46
47 void CmiMemLock() {pthread_mutex_lock(&memory_mutex);}
48 void CmiMemUnlock() {pthread_mutex_unlock(&memory_mutex);}
49
50 int barrier;
51 pthread_cond_t barrier_cond;
52 pthread_mutex_t barrier_mutex;
53
54 void CmiNodeBarrier(void)
55 {
56   pthread_mutex_lock(&barrier_mutex);
57   barrier++;
58   if(barrier!=CmiNumPes())
59     pthread_cond_wait(&barrier_cond, &barrier_mutex);
60   else {
61     barrier = 0;
62     pthread_cond_broadcast(&barrier_cond);
63   }
64   pthread_mutex_unlock(&barrier_mutex);
65 }
66
67 CmiNodeLock CmiCreateLock(void)
68 {
69   pthread_mutex_t *lock;
70   lock = (pthread_mutex_t *) CmiAlloc(sizeof(pthread_mutex_t));
71   pthread_mutex_init(lock, (pthread_mutexattr_t *) 0);
72   return lock;
73 }
74
75 void CmiLock(CmiNodeLock lock)
76 {
77   pthread_mutex_lock(lock);
78 }
79
80 void CmiUnlock(CmiNodeLock lock)
81 {
82   pthread_mutex_unlock(lock);
83 }
84
85 int CmiTryLock(CmiNodeLock lock)
86 {
87   return pthread_mutex_trylock(lock);
88 }
89
90 void CmiDestroyLock(CmiNodeLock lock)
91 {
92   pthread_mutex_destroy(lock);
93 }
94
95 int CmiMyPe()
96 {
97   int mype = (size_t) pthread_getspecific(perThreadKey);
98   return mype;
99 }
100
101 /***********************************************************************
102  *
103  * Abort function:
104  *
105  ************************************************************************/
106
107 void CmiAbort(const char *message)
108 {
109   CmiError(message);
110   abort();
111 }
112
113 int CmiAsyncMsgSent(CmiCommHandle msgid)
114 {
115   return 1;
116 }
117
118
119 typedef struct {
120   char       **argv;
121   int        mype;
122 } USER_PARAMETERS;
123
124 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
125 {
126   int i,j;
127   char **uargv;
128   USER_PARAMETERS *usrparam;
129   pthread_t *aThread;
130  
131   Cmi_numpes = 0; 
132   Cmi_usched = usched;
133   Cmi_initret = initret;
134   Cmi_startFn = fn;
135   Cmi_argc = argc;
136
137   for(i=0;i<Cmi_argc;i++) {
138     if (strcmp(argv[i], "+p") == 0) {
139       sscanf(argv[i+1], "%d", &Cmi_numpes);
140       break;
141     } else {
142       if (sscanf(argv[i], "+p%d", &Cmi_numpes) == 1) 
143         break;
144     }
145   }
146
147   if (Cmi_numpes <= 0)
148   {
149     CmiError("Error: requested number of processors is invalid %d\n",
150               Cmi_numpes);
151     abort();
152   }
153
154
155   pthread_mutex_init(&memory_mutex, (pthread_mutexattr_t *) 0);
156
157   MsgQueue=(McQueue **)CmiAlloc(Cmi_numpes*sizeof(McQueue *));
158   if (MsgQueue == (McQueue **)0) {
159     CmiError("Cannot Allocate Memory...\n");
160     abort();
161   }
162   for(i=0; i<Cmi_numpes; i++) 
163     MsgQueue[i] = McQueueCreate();
164
165   pthread_key_create(&perThreadKey, (void *) 0);
166   barrier = 0;
167   pthread_cond_init(&barrier_cond, (pthread_condattr_t *) 0);
168   pthread_mutex_init(&barrier_mutex, (pthread_mutexattr_t *) 0);
169
170   /* suggest to IRIX that we actually use the right number of processors */
171   pthread_setconcurrency(Cmi_numpes);
172
173   aThread = (pthread_t *) CmiAlloc(sizeof(pthread_t) * Cmi_numpes);
174   for(i=1; i<Cmi_numpes; i++) {
175     uargv = (char **) CmiAlloc(sizeof(char *) * (Cmi_argc+1));
176
177     for (j=0;j<Cmi_argc;j++)
178       uargv[j] = argv[j];
179     uargv[j] = 0;
180
181     usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
182     usrparam->argv = uargv;
183     usrparam->mype = i;
184
185     pthread_create(&aThread[i],(pthread_attr_t *)0,threadInit,(void *)usrparam);
186   }
187   uargv = (char **) CmiAlloc(sizeof(char *) * (Cmi_argc+1));
188   for (j=0;j<Cmi_argc;j++)
189     uargv[j] = argv[j];
190   uargv[j] = 0;
191   usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
192   usrparam->argv = uargv;
193   usrparam->mype = 0;
194   threadInit(usrparam);
195 }
196
197 void CmiTimerInit(void);
198
199 static void *threadInit(void *arg)
200 {
201   USER_PARAMETERS *usrparam;
202   usrparam = (USER_PARAMETERS *) arg;
203
204
205   pthread_setspecific(perThreadKey, (void *) usrparam->mype);
206
207   CthInit(usrparam->argv);
208   ConverseCommonInit(usrparam->argv);
209   CpvInitialize(void*, CmiLocalQueue);
210   CpvAccess(CmiLocalQueue) = (void *) FIFO_Create();
211   CmiTimerInit();
212   if (Cmi_initret==0) {
213     Cmi_startFn(Cmi_argc, usrparam->argv);
214     if (Cmi_usched==0) CsdScheduler(-1);
215     ConverseExit();
216   }
217   return (void *) 0;
218 }
219
220
221 void ConverseExit(void)
222 {
223   ConverseCommonExit();
224   CmiNodeBarrier();
225 }
226
227
228 void CmiDeclareArgs(void)
229 {
230 }
231
232
233 void CmiNotifyIdle()
234 {
235   McQueue *queue = MsgQueue[CmiMyPe()];
236   struct timespec ts;
237   pthread_mutex_lock(&(queue->mutex));
238   if(!queue->len){
239     queue->waiting++;
240     ts.tv_sec = (time_t) 0;
241     ts.tv_nsec = 10000000L;
242     pthread_cond_timedwait(&(queue->cond), &(queue->mutex), &ts);
243     queue->waiting--;
244   }
245   pthread_mutex_unlock(&(queue->mutex));
246   return;
247 }
248
249 void *CmiGetNonLocal()
250 {
251   return McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
252 }
253
254
255 void CmiSyncSendFn(int destPE, int size, char *msg)
256 {
257   char *buf;
258
259   buf=(void *)CmiAlloc(size);
260   memcpy(buf,msg,size);
261   McQueueAddToBack(MsgQueue[destPE],buf); 
262 }
263
264
265 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
266 {
267   CmiSyncSendFn(destPE, size, msg); 
268   return 0;
269 }
270
271
272 void CmiFreeSendFn(int destPE, int size, char *msg)
273 {
274   if (CmiMyPe()==destPE) {
275     FIFO_EnQueue(CpvAccess(CmiLocalQueue),msg);
276   } else {
277     McQueueAddToBack(MsgQueue[destPE],msg); 
278   }
279 }
280
281 void CmiSyncBroadcastFn(int size, char *msg)
282 {
283   int i;
284   for(i=0; i<Cmi_numpes; i++)
285     if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
286 }
287
288 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
289 {
290   CmiSyncBroadcastFn(size, msg);
291   return 0;
292 }
293
294 void CmiFreeBroadcastFn(int size, char *msg)
295 {
296   CmiSyncBroadcastFn(size,msg);
297   CmiFree(msg);
298 }
299
300 void CmiSyncBroadcastAllFn(int size, char *msg)
301 {
302   int i;
303   for(i=0; i<CmiNumPes(); i++) 
304     CmiSyncSendFn(i,size,msg);
305 }
306
307
308 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
309 {
310   CmiSyncBroadcastAllFn(size, msg);
311   return 0; 
312 }
313
314
315 void CmiFreeBroadcastAllFn(int size, char *msg)
316 {
317   int i;
318   for(i=0; i<CmiNumPes(); i++) {
319     if(CmiMyPe() != i) {
320       CmiSyncSendFn(i,size,msg);
321     }
322   }
323   FIFO_EnQueue(CpvAccess(CmiLocalQueue),msg);
324 }
325
326 /* ****************************************************************** */
327 /*    The following internal functions implements FIFO queues for     */
328 /*    messages. These queues are shared among threads                 */
329 /* ****************************************************************** */
330
331 static void ** AllocBlock(unsigned int len)
332 {
333   void **blk;
334
335   blk=(void **)CmiAlloc(len*sizeof(void *));
336   return blk;
337 }
338
339 static void 
340 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
341 {
342   memcpy(destblk, &(srcblk[first]), (len-first)*sizeof(void *));
343   memcpy(&(destblk[len-first]),srcblk,first*sizeof(void *));
344 }
345
346 McQueue * McQueueCreate(void)
347 {
348   McQueue *queue;
349
350   queue = (McQueue *) CmiAlloc(sizeof(McQueue));
351   pthread_mutex_init(&(queue->mutex), (pthread_mutexattr_t *) 0);
352   pthread_cond_init(&(queue->cond), (pthread_condattr_t *) 0);
353   queue->waiting = 0;
354   queue->blk = AllocBlock(BLK_LEN);
355   queue->blk_len = BLK_LEN;
356   queue->first = 0;
357   queue->len = 0;
358   queue->maxlen = 0;
359   return queue;
360 }
361
362 void McQueueAddToBack(McQueue *queue, void *element)
363 {
364   pthread_mutex_lock(&(queue->mutex));
365   if(queue->len==queue->blk_len) {
366     void **blk;
367
368     queue->blk_len *= 3;
369     blk = AllocBlock(queue->blk_len);
370     SpillBlock(queue->blk, blk, queue->first, queue->len);
371     CmiFree(queue->blk);
372     queue->blk = blk;
373     queue->first = 0;
374   }
375   queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
376   if(queue->len>queue->maxlen)
377     queue->maxlen = queue->len;
378   if(queue->waiting) {
379     pthread_cond_broadcast(&(queue->cond));
380   }
381   pthread_mutex_unlock(&(queue->mutex));
382 }
383
384
385 void * McQueueRemoveFromFront(McQueue *queue)
386 {
387   void *element = 0;
388   pthread_mutex_lock(&(queue->mutex));
389   if(queue->len) {
390     element = queue->blk[queue->first++];
391     queue->first = (queue->first+queue->blk_len)%queue->blk_len;
392     queue->len--;
393   }
394   pthread_mutex_unlock(&(queue->mutex));
395   return element;
396 }
397
398 /* Timer Routines */
399
400
401 CpvStaticDeclare(double,inittime_wallclock);
402 CpvStaticDeclare(double,inittime_virtual);
403
404 void CmiTimerInit(void)
405 {
406   struct timespec temp;
407   CpvInitialize(double, inittime_wallclock);
408   CpvInitialize(double, inittime_virtual);
409   clock_gettime(CLOCK_SGI_CYCLE, &temp);
410   CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
411                                   1e-9 * temp.tv_nsec;
412   CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
413 }
414
415 double CmiWallTimer(void)
416 {
417   struct timespec temp;
418   double currenttime;
419
420   clock_gettime(CLOCK_SGI_CYCLE, &temp);
421   currenttime = (double) temp.tv_sec +
422                 1e-9 * temp.tv_nsec;
423   return (currenttime - CpvAccess(inittime_wallclock));
424 }
425
426 double CmiCpuTimer(void)
427 {
428   struct timespec temp;
429   double currenttime;
430
431   clock_gettime(CLOCK_SGI_CYCLE, &temp);
432   currenttime = (double) temp.tv_sec +
433                 1e-9 * temp.tv_nsec;
434   return (currenttime - CpvAccess(inittime_virtual));
435 }
436
437 double CmiTimer(void)
438 {
439   return CmiCpuTimer();
440 }
441