c9b1008c9b089ce1a2b6affe074a3eb29426a89d
[charm.git] / src / arch / origin-pthreads / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * Origin Pthreads machine layer
10  * @ingroup Machine
11  * @{
12  */
13
14 #include <errno.h>
15 #include <pthread.h>
16 #include <sched.h>
17 #include <time.h>
18
19 #include <stdio.h>
20 #include <sys/types.h>
21 #include <sys/time.h>
22 #include <limits.h>
23 #include <unistd.h>
24
25 #include "converse.h"
26
27 #define BLK_LEN  512
28
29 typedef struct {
30   pthread_mutex_t mutex;
31   pthread_cond_t cond;
32   int waiting;
33   CdsFifo q;
34 } McQueue;
35
36 static McQueue *McQueueCreate(void);
37 static void McQueueAddToBack(McQueue *queue, void *element);
38 static void *McQueueRemoveFromFront(McQueue *queue);
39 static McQueue **MsgQueue;
40
41 CpvDeclare(void*, CmiLocalQueue);
42
43 int Cmi_argc;
44 int _Cmi_numpes;
45 int Cmi_usched;
46 int Cmi_initret;
47 CmiStartFn Cmi_startFn;
48
49 pthread_key_t perThreadKey;
50
51 static void *threadInit(void *arg);
52
53 pthread_mutex_t memory_mutex;
54
55 void CmiMemLock() {pthread_mutex_lock(&memory_mutex);}
56 void CmiMemUnlock() {pthread_mutex_unlock(&memory_mutex);}
57
58 int barrier;
59 pthread_cond_t barrier_cond;
60 pthread_mutex_t barrier_mutex;
61
62 void CmiNodeBarrier(void)
63 {
64   pthread_mutex_lock(&barrier_mutex);
65   barrier++;
66   if(barrier!=CmiNumPes())
67     pthread_cond_wait(&barrier_cond, &barrier_mutex);
68   else {
69     barrier = 0;
70     pthread_cond_broadcast(&barrier_cond);
71   }
72   pthread_mutex_unlock(&barrier_mutex);
73 }
74
75 void CmiNodeAllBarrier(void)
76 {
77   pthread_mutex_lock(&barrier_mutex);
78   barrier++;
79   if(barrier!=CmiNumPes()+1)
80     pthread_cond_wait(&barrier_cond, &barrier_mutex);
81   else {
82     barrier = 0;
83     pthread_cond_broadcast(&barrier_cond);
84   }
85   pthread_mutex_unlock(&barrier_mutex);
86 }
87
88 CmiNodeLock CmiCreateLock(void)
89 {
90   pthread_mutex_t *lock;
91   lock = (pthread_mutex_t *) CmiAlloc(sizeof(pthread_mutex_t));
92   pthread_mutex_init(lock, (pthread_mutexattr_t *) 0);
93   return lock;
94 }
95
96 void CmiLock(CmiNodeLock lock)
97 {
98   pthread_mutex_lock(lock);
99 }
100
101 void CmiUnlock(CmiNodeLock lock)
102 {
103   pthread_mutex_unlock(lock);
104 }
105
106 int CmiTryLock(CmiNodeLock lock)
107 {
108   return pthread_mutex_trylock(lock);
109 }
110
111 void CmiDestroyLock(CmiNodeLock lock)
112 {
113   pthread_mutex_destroy(lock);
114 }
115
116 int CmiMyPe()
117 {
118   int mype = (size_t) pthread_getspecific(perThreadKey);
119   return mype;
120 }
121
122 /***********************************************************************
123  *
124  * Abort function:
125  *
126  ************************************************************************/
127
128 void CmiAbort(const char *message)
129 {
130   CmiError(message);
131   abort();
132 }
133
134 int CmiAsyncMsgSent(CmiCommHandle msgid)
135 {
136   return 1;
137 }
138
139
140 typedef struct {
141   char       **argv;
142   int        mype;
143 } USER_PARAMETERS;
144
145 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
146 {
147   int i;
148   USER_PARAMETERS *usrparam;
149   pthread_t *aThread;
150  
151   _Cmi_numpes = 0; 
152   Cmi_usched = usched;
153   Cmi_initret = initret;
154   Cmi_startFn = fn;
155
156   CmiGetArgInt(argv,"+p",&_Cmi_numpes);
157   if (_Cmi_numpes <= 0)
158   {
159     CmiError("Error: requested number of processors is invalid %d\n",
160               _Cmi_numpes);
161     abort();
162   }
163
164
165   pthread_mutex_init(&memory_mutex, (pthread_mutexattr_t *) 0);
166
167   MsgQueue=(McQueue **)CmiAlloc(_Cmi_numpes*sizeof(McQueue *));
168   for(i=0; i<_Cmi_numpes; i++) 
169     MsgQueue[i] = McQueueCreate();
170
171   pthread_key_create(&perThreadKey, (void *) 0);
172   barrier = 0;
173   pthread_cond_init(&barrier_cond, (pthread_condattr_t *) 0);
174   pthread_mutex_init(&barrier_mutex, (pthread_mutexattr_t *) 0);
175
176   /* suggest to IRIX that we actually use the right number of processors */
177   pthread_setconcurrency(_Cmi_numpes);
178
179   Cmi_argc = CmiGetArgc(argv);
180   aThread = (pthread_t *) CmiAlloc(sizeof(pthread_t) * _Cmi_numpes);
181   for(i=1; i<_Cmi_numpes; i++) {
182     usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
183     usrparam->argv = CmiCopyArgs(argv);
184     usrparam->mype = i;
185
186     pthread_create(&aThread[i],(pthread_attr_t *)0,threadInit,(void *)usrparam);
187   }
188   usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
189   usrparam->argv = CmiCopyArgs(argv);
190   usrparam->mype = 0;
191   threadInit(usrparam);
192 }
193
194 void CmiTimerInit(void);
195
196 static void *threadInit(void *arg)
197 {
198   USER_PARAMETERS *usrparam;
199   usrparam = (USER_PARAMETERS *) arg;
200
201
202   pthread_setspecific(perThreadKey, (void *) usrparam->mype);
203
204   CthInit(usrparam->argv);
205   ConverseCommonInit(usrparam->argv);
206   CpvInitialize(void*, CmiLocalQueue);
207   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
208   CmiTimerInit();
209   if (Cmi_initret==0) {
210     Cmi_startFn(Cmi_argc, usrparam->argv);
211     if (Cmi_usched==0) CsdScheduler(-1);
212     ConverseExit();
213   }
214   return (void *) 0;
215 }
216
217
218 void ConverseExit(void)
219 {
220   ConverseCommonExit();
221   CmiNodeBarrier();
222 }
223
224
225 void CmiDeclareArgs(void)
226 {
227 }
228
229
230 void CmiNotifyIdle()
231 {
232   McQueue *queue = MsgQueue[CmiMyPe()];
233   struct timespec ts;
234   pthread_mutex_lock(&(queue->mutex));
235   if(CdsFifo_Empty(queue->q)){
236     queue->waiting++;
237     ts.tv_sec = (time_t) 0;
238     ts.tv_nsec = 10000000L;
239     pthread_cond_timedwait(&(queue->cond), &(queue->mutex), &ts);
240     queue->waiting--;
241   }
242   pthread_mutex_unlock(&(queue->mutex));
243   return;
244 }
245
246 void *CmiGetNonLocal()
247 {
248   return McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
249 }
250
251
252 void CmiSyncSendFn(int destPE, int size, char *msg)
253 {
254   char *buf;
255
256   buf=(void *)CmiAlloc(size);
257   memcpy(buf,msg,size);
258   McQueueAddToBack(MsgQueue[destPE],buf); 
259   CQdCreate(CpvAccess(cQdState), 1);
260 }
261
262
263 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
264 {
265   CmiSyncSendFn(destPE, size, msg); 
266   return 0;
267 }
268
269
270 void CmiFreeSendFn(int destPE, int size, char *msg)
271 {
272   if (CmiMyPe()==destPE) {
273     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
274   } else {
275     McQueueAddToBack(MsgQueue[destPE],msg); 
276   }
277   CQdCreate(CpvAccess(cQdState), 1);
278 }
279
280 void CmiSyncBroadcastFn(int size, char *msg)
281 {
282   int i;
283   for(i=0; i<_Cmi_numpes; i++)
284     if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
285 }
286
287 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
288 {
289   CmiSyncBroadcastFn(size, msg);
290   return 0;
291 }
292
293 void CmiFreeBroadcastFn(int size, char *msg)
294 {
295   CmiSyncBroadcastFn(size,msg);
296   CmiFree(msg);
297 }
298
299 void CmiSyncBroadcastAllFn(int size, char *msg)
300 {
301   int i;
302   for(i=0; i<CmiNumPes(); i++) 
303     CmiSyncSendFn(i,size,msg);
304 }
305
306
307 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
308 {
309   CmiSyncBroadcastAllFn(size, msg);
310   return 0; 
311 }
312
313
314 void CmiFreeBroadcastAllFn(int size, char *msg)
315 {
316   int i;
317   for(i=0; i<CmiNumPes(); i++) {
318     if(CmiMyPe() != i) {
319       CmiSyncSendFn(i,size,msg);
320     }
321   }
322   CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
323   CQdCreate(CpvAccess(cQdState), 1);
324 }
325
326 /* ****************************************************************** */
327 /*    The following internal functions implements FIFO queues for     */
328 /*    messages. These queues are shared among threads                 */
329 /* ****************************************************************** */
330
331 static void ** AllocBlock(unsigned int len)
332 {
333   void **blk;
334
335   blk=(void **)CmiAlloc(len*sizeof(void *));
336   return blk;
337 }
338
339 static void 
340 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
341 {
342   memcpy(destblk, &(srcblk[first]), (len-first)*sizeof(void *));
343   memcpy(&(destblk[len-first]),srcblk,first*sizeof(void *));
344 }
345
346 McQueue * McQueueCreate(void)
347 {
348   McQueue *queue;
349
350   queue = (McQueue *) CmiAlloc(sizeof(McQueue));
351   pthread_mutex_init(&(queue->mutex), (pthread_mutexattr_t *) 0);
352   pthread_cond_init(&(queue->cond), (pthread_condattr_t *) 0);
353   queue->waiting = 0;
354   queue->q = CdsFifo_Create_len(BLK_LEN);
355   return queue;
356 }
357
358 void McQueueAddToBack(McQueue *queue, void *element)
359 {
360   pthread_mutex_lock(&(queue->mutex));
361   CdsFifo_Enqueue(queue->q, element);
362   if(queue->waiting) {
363     pthread_cond_broadcast(&(queue->cond));
364   }
365   pthread_mutex_unlock(&(queue->mutex));
366 }
367
368
369 void * McQueueRemoveFromFront(McQueue *queue)
370 {
371   void *element = 0;
372   pthread_mutex_lock(&(queue->mutex));
373   element = CdsFifo_Dequeue(queue->q);
374   pthread_mutex_unlock(&(queue->mutex));
375   return element;
376 }
377
378 /* Timer Routines */
379
380
381 CpvStaticDeclare(double,inittime_wallclock);
382 CpvStaticDeclare(double,inittime_virtual);
383
384 void CmiTimerInit(void)
385 {
386   struct timespec temp;
387   CpvInitialize(double, inittime_wallclock);
388   CpvInitialize(double, inittime_virtual);
389   clock_gettime(CLOCK_SGI_CYCLE, &temp);
390   CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
391                                   1e-9 * temp.tv_nsec;
392   CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
393 }
394
395 double CmiWallTimer(void)
396 {
397   struct timespec temp;
398   double currenttime;
399
400   clock_gettime(CLOCK_SGI_CYCLE, &temp);
401   currenttime = (double) temp.tv_sec +
402                 1e-9 * temp.tv_nsec;
403   return (currenttime - CpvAccess(inittime_wallclock));
404 }
405
406 double CmiCpuTimer(void)
407 {
408   struct timespec temp;
409   double currenttime;
410
411   clock_gettime(CLOCK_SGI_CYCLE, &temp);
412   currenttime = (double) temp.tv_sec +
413                 1e-9 * temp.tv_nsec;
414   return (currenttime - CpvAccess(inittime_virtual));
415 }
416
417 double CmiTimer(void)
418 {
419   return CmiCpuTimer();
420 }
421
422 /*@}*/