doc: Add serial to list of ci file reserved words
[charm.git] / src / conv-core / cpthreads.c
1 /*
2  * TO-DO:
3  *
4  * what about shared memory machines?
5  *
6  * what about the fact that posix threads programs exit when all
7  * threads have completed?
8  *
9  * write errcode, errspan.  Figure out errno thing.
10  *
11  * there's obviously something I don't understand about cond... what's
12  * the mutex for?
13  *
14  * how are we going to implement pthread_cond_timedwait?
15  *
16  * what about shared-memory locks?
17  *
18  * implement inheritance of processor-private data (in Cth)
19  *
20  */
21
22 #define CPTHREAD_IS_HERE
23 #define SUPPRESS_PTHREADS
24 #include "cpthreads.h"
25 #include <stdlib.h>
26 #include <errno.h>
27
28 /******************************************************************************
29  *
30  * Magic Numbers
31  *
32  * Each of the structures we use in this file has a magic number associated
33  * with it for error-checking purposes.
34  *
35  *****************************************************************************/
36
37 #define PT_MAGIC    0x8173292a
38 #define ATTR_MAGIC  0x783a2004
39 #define KEY_MAGIC   0x99934315
40 #define FKEY_MAGIC  0x99934315
41 #define MATTR_MAGIC 0x12673434
42 #define CATTR_MAGIC 0xA865B812
43
44 #undef MUTEX_MAGIC
45 #undef COND_MAGIC
46 #define MUTEX_MAGIC 0x13237770
47 #define COND_MAGIC  0x99431664
48
49 /******************************************************************************
50  *
51  * The Thread Structure Definition.
52  *
53  *****************************************************************************/
54  
55 typedef void *(*voidfn)();
56
57 struct Cpthread_s
58 {
59   int magic;
60   voidfn startfn;
61   void *startarg1;
62   void *startarg2;
63   void *startarg3;
64   int detached;
65   void *joinstatus;
66   Cpthread_cleanup_t cleanups;
67   CthThread waiting;
68   CthThread thread;
69 };
70
71 #define errcode(n) { Cpthread_errno=(n); return -1; }
72
73
74 /******************************************************************************
75  *
76  * POSIX Thread private data.
77  *
78  * Posix references thread_private data by keys.  For each key created, we
79  * use CthRegister to allocate another 4 bytes of Cth thread-private space.  
80  * The offset returned by CthRegister and the key's destructor (if any)
81  * are stored in the POSIX thread-private key structure.  We keep a list
82  * of all the active keys so that when a thread exits, we can execute
83  * all the thread-private destructors.  Since we can't really destroy
84  * a key (there's no CthUnRegister), we simply put ``destroyed'' keys onto
85  * a list of inactive keys and reuse them later.
86  *
87  *****************************************************************************/
88
89 struct Cpthread_key_s
90 {
91   int magic;
92   int offset;
93   void (*destructo)(void *);
94   Cpthread_key_t next;
95 };
96
97 Cpthread_key_t keys_active = 0;
98 Cpthread_key_t keys_inactive = 0;
99
100 int Cpthread_key_create(Cpthread_key_t *keyp, void (*destructo)(void *))
101 {
102   Cpthread_key_t key;
103   key = keys_inactive;
104   if (key) {
105     keys_inactive = key->next;
106   } else {
107     key = (Cpthread_key_t)malloc(sizeof(struct Cpthread_key_s));
108     _MEMCHECK(key);
109     key->offset = CthRegister(sizeof(void *));
110   }
111   key->magic = KEY_MAGIC;
112   key->destructo = destructo;
113   key->next = keys_active;
114   keys_active = key;
115   *keyp = key;
116   return 0;
117 }
118
119 int Cpthread_key_delete(Cpthread_key_t key)
120 {
121   Cpthread_key_t active = keys_active;
122   if (key->magic != KEY_MAGIC) errcode(EINVAL);
123   if (active==key) {
124     keys_active = key->next;
125   } else {
126     while (active) {
127       if (active->next == key) {
128         active->next = key->next;
129         goto deleted;
130       }
131       active = active->next;
132     }
133     return -1;
134   }
135 deleted:
136   key->magic = FKEY_MAGIC;
137   key->next = keys_inactive;
138   keys_inactive = key;
139   return 0;
140 }
141
142 int Cpthread_setspecific(Cpthread_key_t key, void *val)
143 {
144   char *data;
145   data = CthCpvAccess(CthData);
146   if (key->magic != KEY_MAGIC) errcode(EINVAL);
147   *((void **)(data+(key->offset))) = val;
148   return 0;
149 }
150
151 void *Cpthread_getspecific(Cpthread_key_t key)
152 {
153   char *data = CthCpvAccess(CthData);
154   if (key->magic != KEY_MAGIC) return 0;
155   return *((void **)(data+(key->offset)));
156 }
157
158 /******************************************************************************
159  *
160  * Cleanup routines.
161  *
162  * Every thread has a hook for cleanup routines that are to be automatically
163  * called at exit-time.  These functions add cleanup-routines to a thread.
164  *
165  *****************************************************************************/
166
167 struct Cpthread_cleanup_s
168 {
169   void (*routine)(void *);
170   void *argument;
171   Cpthread_cleanup_t next;
172 };
173
174 void Cpthread_cleanup_push(void (*routine)(void*), void *arg)
175 {
176   Cpthread_t pt = CtvAccess(Cpthread_current);
177   Cpthread_cleanup_t c =
178     (Cpthread_cleanup_t)malloc(sizeof(struct Cpthread_cleanup_s));
179   _MEMCHECK(c);
180   c->routine = routine;
181   c->argument = arg;
182   c->next = pt->cleanups;
183   pt->cleanups = c;
184 }
185
186 void Cpthread_cleanup_pop(int execute)
187 {
188   Cpthread_t pt = CtvAccess(Cpthread_current);
189   Cpthread_cleanup_t c = pt->cleanups;
190   if (c) {
191     pt->cleanups = c->next;
192     if (execute) (c->routine)(c->argument);
193     free(c);
194   }
195 }
196
197 /******************************************************************************
198  *
199  * Thread Attributes
200  *
201  * Threads have two attributes set at creation time:
202  *
203  *    1. stack size.
204  *    2. detached or not.
205  *
206  * These attributes must be put into an attribute structure before
207  * calling the thread creation function.
208  *
209  *****************************************************************************/
210
211 int Cpthread_attr_init(Cpthread_attr_t *attr)
212 {
213   attr->magic = ATTR_MAGIC;
214   attr->detached = 0;
215   attr->stacksize = 0;
216   return 0;
217 }
218
219 int Cpthread_attr_destroy(Cpthread_attr_t *attr)
220 {
221   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
222   attr->magic = 0;
223   return 0;
224 }
225
226 int Cpthread_attr_getstacksize(Cpthread_attr_t *attr, size_t *size)
227 {
228   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
229   *size = attr->stacksize;
230   return 0;
231 }
232
233 int Cpthread_attr_setstacksize(Cpthread_attr_t *attr, size_t size)
234 {
235   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
236   attr->stacksize = size;
237   return 0;
238 }
239
240 int Cpthread_attr_getdetachstate(Cpthread_attr_t *attr, int *state)
241 {
242   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
243   *state = attr->detached;
244   return 0;
245 }
246
247 int Cpthread_attr_setdetachstate(Cpthread_attr_t *attr, int state)
248 {
249   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
250   attr->detached = state;
251   return 0;
252 }
253
254 /******************************************************************************
255  *
256  * Thread primary operations: create, destroy, equal, self, detach, join
257  *
258  * Every thread is associated with a CthThread and a pthread_t (which are
259  * separate from each other).  The pthread_t contains a field pointing to the
260  * CthThread, and the CthThread has a thread-private variable ``Cpthread_current''
261  * pointing to the pthread_t.
262  *
263  *****************************************************************************/
264
265 void Cpthread_top(Cpthread_t pt)
266 {
267   Cpthread_key_t k; char *data; void *result; 
268
269   data = CthCpvAccess(CthData);
270   for (k=keys_active; k; k=k->next)
271     *(void **)(data+(k->offset)) = 0;
272   CtvAccess(Cpthread_errcode) = 0;
273   CtvAccess(Cpthread_current) = pt;
274   result = (pt->startfn)(pt->startarg1, pt->startarg2, pt->startarg3);
275   Cpthread_exit(result);
276 }
277
278 int Cpthread_create3(Cpthread_t *thread, Cpthread_attr_t *attr,
279                      voidfn fn, void *a1, void *a2, void *a3)
280 {
281   Cpthread_t pt;
282   if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
283   pt = (Cpthread_t)malloc(sizeof(struct Cpthread_s));
284   _MEMCHECK(pt);
285   pt->magic = PT_MAGIC;
286   pt->startfn = fn;
287   pt->startarg1 = a1;
288   pt->startarg2 = a2;
289   pt->startarg3 = a3;
290   pt->detached = attr->detached;
291   pt->joinstatus = 0;
292   pt->cleanups = 0;
293   pt->waiting = 0;
294   pt->thread = CthCreate((CthVoidFn)Cpthread_top, (void *)pt, attr->stacksize);
295   CthSetStrategyDefault(pt->thread);
296   CthAwaken(pt->thread);
297   *thread = pt;
298   return 0;
299 }
300
301 int Cpthread_create(Cpthread_t *thread, Cpthread_attr_t *attr,
302                      voidfn fn, void *arg)
303 {
304   return Cpthread_create3(thread, attr, fn, arg, 0, 0);
305 }
306
307 void Cpthread_exit(void *status)
308 {
309   Cpthread_t pt; Cpthread_cleanup_t c, cn; Cpthread_key_t k;
310   void *priv; char *data; CthThread t;
311
312   pt = CtvAccess(Cpthread_current);
313   t = pt->thread;
314   c = pt->cleanups;
315   data = CthCpvAccess(CthData);
316
317   /* perform all cleanup functions */
318   while (c) {
319     (c->routine)(c->argument);
320     cn = c->next;
321     free(c); c=cn;
322   }
323   /* execute destructors for thread-private data */
324   k = keys_active;
325   while (k) {
326     if (k->destructo) {
327       priv = *(void **)(data+(k->offset));
328       if (priv) (k->destructo)(priv);
329     }
330     k=k->next;
331   }
332   /* handle the join-operation */
333   if (pt->detached) {
334     pt->magic = 0;
335     free(pt);
336   } else {
337     pt->joinstatus = status;
338     pt->thread = 0;
339     if (pt->waiting) CthAwaken(pt->waiting);
340   }
341   CthFree(t);
342   CthSuspend();
343 }
344
345 int Cpthread_equal(Cpthread_t t1, Cpthread_t t2)
346 {
347   return (t1==t2);
348 }
349
350 int Cpthread_detach(Cpthread_t pt)
351 {
352   if (pt->magic != PT_MAGIC) errcode(EINVAL);
353   if (pt->thread==0) {
354     pt->magic = 0;
355     free(pt);
356   } else {
357     pt->detached = 1;
358   }
359   return 0;
360 }
361
362 int Cpthread_join(Cpthread_t pt, void **status)
363 {
364   if (pt->magic != PT_MAGIC) errcode(EINVAL);
365   if (pt->thread) {
366     pt->waiting = CthSelf();
367     CthSuspend();
368   }
369   *status = pt->joinstatus;
370   free(pt);
371   return 0;
372 }
373
374 int Cpthread_once(Cpthread_once_t *once, void (*fn)(void))
375 {
376   int rank = CmiMyRank();
377   if (rank>=32) {
378     CmiPrintf("error: cpthreads current implementation limited to 32 PE's per node.\n");
379     exit(1);
380   }
381   if (once->flag[rank]) return 0;
382   once->flag[rank] = 1;
383   fn();
384   return 1;
385 }
386
387 /******************************************************************************
388  *
389  * Synchronization Structure: MUTEX
390  *
391  * Caution, when updating this code: the COND routines also access
392  * the internals of the MUTEX structures. 
393  *
394  *****************************************************************************/
395
396 static void errspan()
397 {
398   CmiPrintf("Error: Cpthreads sync primitives do not work across processor boundaries.\n");
399   exit(1);
400 }
401
402 int Cpthread_mutexattr_init(Cpthread_mutexattr_t *mattr)
403 {
404   mattr->magic = MATTR_MAGIC;
405   return 0;
406 }
407
408 int Cpthread_mutexattr_destroy(Cpthread_mutexattr_t *mattr)
409 {
410   if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
411   mattr->magic = 0;
412   return 0;
413 }
414
415 int Cpthread_mutexattr_getpshared(Cpthread_mutexattr_t *mattr, int *pshared)
416 {
417   if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
418   *pshared = mattr->pshared;
419   return 0;
420 }
421
422 int Cpthread_mutexattr_setpshared(Cpthread_mutexattr_t *mattr, int pshared)
423 {
424   if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
425   mattr->pshared = pshared;
426   return 0;
427 }
428
429 int Cpthread_mutex_init(Cpthread_mutex_t *mutex, Cpthread_mutexattr_t *mattr)
430 {
431   if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
432   mutex->magic = MUTEX_MAGIC;
433   mutex->onpe = CmiMyPe();
434   mutex->users = CdsFifo_Create();
435   return 0;
436 }
437
438 int Cpthread_mutex_destroy(Cpthread_mutex_t *mutex)
439 {
440   if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
441   if (mutex->onpe != CmiMyPe()) errspan();
442   if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
443   mutex->magic = 0;
444   CdsFifo_Destroy(mutex->users);
445   return 0;
446 }
447
448 int Cpthread_mutex_lock(Cpthread_mutex_t *mutex)
449 {
450   CthThread self = CthSelf();
451   if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
452   if (mutex->onpe != CmiMyPe()) errspan();
453   CdsFifo_Enqueue(mutex->users, self);
454   if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
455   return 0;
456 }
457
458 int Cpthread_mutex_trylock(Cpthread_mutex_t *mutex)
459 {
460   CthThread self = CthSelf();
461   if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
462   if (mutex->onpe != CmiMyPe()) errspan();
463   if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
464   CdsFifo_Enqueue(mutex->users, self);
465   return 0;
466 }
467
468 int Cpthread_mutex_unlock(Cpthread_mutex_t *mutex)
469 {
470   CthThread self = CthSelf();
471   CthThread sleeper;
472   if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
473   if (mutex->onpe != CmiMyPe()) errspan();
474   if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
475   CdsFifo_Pop(mutex->users);
476   sleeper = CdsFifo_Peek(mutex->users);
477   if (sleeper) CthAwaken(sleeper);
478   return 0;
479 }
480
481 /******************************************************************************
482  *
483  * Synchronization Structure: COND
484  *
485  *****************************************************************************/
486
487 int Cpthread_condattr_init(Cpthread_condattr_t *cattr)
488 {
489   cattr->magic = CATTR_MAGIC;
490   return 0;
491 }
492
493 int Cpthread_condattr_destroy(Cpthread_condattr_t *cattr)
494 {
495   if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
496   return 0;
497 }
498
499 int Cpthread_condattr_getpshared(Cpthread_condattr_t *cattr, int *pshared)
500 {
501   if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
502   *pshared = cattr->pshared;
503   return 0;
504 }
505
506 int Cpthread_condattr_setpshared(Cpthread_condattr_t *cattr, int pshared)
507 {
508   if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
509   cattr->pshared = pshared;
510   return 0;
511 }
512
513 int Cpthread_cond_init(Cpthread_cond_t *cond, Cpthread_condattr_t *cattr)
514 {
515   if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
516   cond->magic = COND_MAGIC;
517   cond->onpe = CmiMyPe();
518   cond->users = CdsFifo_Create();
519   return 0;
520 }
521
522 int Cpthread_cond_destroy(Cpthread_cond_t *cond)
523 {
524   if (cond->magic != COND_MAGIC) errcode(EINVAL);
525   if (cond->onpe != CmiMyPe()) errspan();
526   cond->magic = 0;
527   CdsFifo_Destroy(cond->users);
528   return 0;
529 }
530
531 int Cpthread_cond_wait(Cpthread_cond_t *cond, Cpthread_mutex_t *mutex)
532 {
533   CthThread self = CthSelf();
534   CthThread sleeper;
535
536   if (cond->magic != COND_MAGIC) errcode(EINVAL);
537   if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
538   if (cond->onpe != CmiMyPe()) errspan();
539   if (mutex->onpe != CmiMyPe()) errspan();
540
541   if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
542   CdsFifo_Pop(mutex->users);
543   sleeper = CdsFifo_Peek(mutex->users);
544   if (sleeper) CthAwaken(sleeper);
545   CdsFifo_Enqueue(cond->users, self);
546   CthSuspend();
547   CdsFifo_Enqueue(mutex->users, self);
548   if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
549   return 0;
550 }
551
552 int Cpthread_cond_signal(Cpthread_cond_t *cond)
553 {
554   CthThread sleeper;
555   if (cond->magic != COND_MAGIC) errcode(EINVAL);
556   if (cond->onpe != CmiMyPe()) errspan();
557   sleeper = CdsFifo_Dequeue(cond->users);
558   if (sleeper) CthAwaken(sleeper);
559   return 0;
560 }
561
562 int Cpthread_cond_broadcast(Cpthread_cond_t *cond)
563 {
564   CthThread sleeper;
565   if (cond->magic != COND_MAGIC) errcode(EINVAL);
566   if (cond->onpe != CmiMyPe()) errspan();
567   while (1) {
568     sleeper = CdsFifo_Dequeue(cond->users);
569     if (sleeper==0) break;
570     CthAwaken(sleeper);
571   }
572   return 0;
573 }
574
575 /******************************************************************************
576  *
577  * Module initialization
578  *
579  *****************************************************************************/
580
581 typedef void (*mainfn)(int argc, char **argv);
582
583 int Cpthread_init()
584 {
585   return 0;
586 }
587
588 void CpthreadModuleInit()
589 {
590   CtvInitialize(Cpthread_t, Cpthread_current);
591   CtvInitialize(int,        Cpthread_errcode);
592 }
593
594 void Cpthread_start_main(mainfn fn, int argc, char **argv)
595 {
596   Cpthread_t pt;
597   Cpthread_attr_t attrib;
598   CmiIntPtr pargc = argc;
599   if (CmiMyRank()==0) {
600     Cpthread_attr_init(&attrib);
601     Cpthread_attr_setdetachstate(&attrib, 1);
602     Cpthread_create3(&pt, &attrib, (voidfn)fn, (void *)pargc, argv, 0);
603   }
604 }