fixed CmiRegisterHandler warnings.
[charm.git] / src / conv-core / futures.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdlib.h>
9 #include <string.h>
10 #include "converse.h"
11
12 typedef struct Cfuture_data_s
13 {
14   void      *value;
15   int        ready;
16   CthThread  waiters;
17 }
18 *futdata;
19
20 typedef struct CfutureValue_s
21 {
22   char core[CmiMsgHeaderSizeBytes];
23   struct Cfuture_data_s *data;
24   int valsize;
25   double rest[1];
26 }
27 *CfutureValue;
28
29 #define field_offset(t, f) ((size_t)(((t)0)->f))
30 #define void_to_value(v) ((CfutureValue)(((char*)v)-field_offset(CfutureValue,rest)))
31
32 CpvDeclare(int, CfutureStoreIndex);
33
34 Cfuture CfutureCreate()
35 {
36   futdata data = (futdata)malloc(sizeof(struct Cfuture_data_s));
37   Cfuture result;
38   _MEMCHECK(data);
39   data->value = 0;
40   data->ready = 0;
41   data->waiters = 0;
42   result.pe = CmiMyPe();
43   result.data = data;
44   return result;
45 }
46
47 static void CfutureAwaken(futdata data, CfutureValue val)
48 {
49   CthThread t;
50   data->value = val;
51   data->ready = 1;
52   for (t=data->waiters; t; t=CthGetNext(t))
53     CthAwaken(t);
54   data->waiters=0;
55 }
56
57 static void CfutureStore(CfutureValue m)
58 {
59   CfutureAwaken(m->data, m);
60 }
61
62 void *CfutureCreateBuffer(int bytes)
63 {
64   int valsize = sizeof(struct CfutureValue_s) + bytes;
65   CfutureValue m = (CfutureValue)CmiAlloc(valsize);
66   CmiSetHandler(m, CpvAccess(CfutureStoreIndex));
67   m->valsize = valsize;
68   return (void*)(m->rest);
69 }
70
71 void CfutureDestroyBuffer(void *v)
72 {
73   CmiFree(v);
74 }
75
76 void CfutureStoreBuffer(Cfuture f, void *value)
77 {
78   CfutureValue m = void_to_value(value);
79   if (f.pe == CmiMyPe()) {
80     CfutureAwaken(f.data, m);
81   } else {
82     m->data = f.data;
83     CmiSyncSendAndFree(f.pe, m->valsize, m);
84   }
85 }
86
87 void CfutureSet(Cfuture f, void *value, int len)
88 {
89   void *copy = CfutureCreateBuffer(len);
90   memcpy(copy, value, len);
91   CfutureStoreBuffer(f, copy);
92 }
93
94 void *CfutureWait(Cfuture f)
95 {
96   CthThread self; CfutureValue value; futdata data;
97   if (f.pe != CmiMyPe()) {
98     CmiPrintf("error: CfutureWait: future not local.\n");
99     exit(1);
100   }
101   data = f.data;
102   if (data->ready == 0) {
103     self = CthSelf();
104     CthSetNext(self, data->waiters);
105     data->waiters = self;
106     CthSuspend();
107   }
108   value = data->value;
109   return (void*)(value->rest);
110 }
111
112 void CfutureDestroy(Cfuture f)
113 {
114   if (f.pe != CmiMyPe()) {
115     CmiPrintf("error: CfutureDestroy: future not local.\n");
116     exit(1);
117   }
118   if (f.data->waiters) {
119     CmiPrintf("error: CfutureDestroy: destroying an active future.\n");
120     exit(1);
121   }
122   if (f.data->value) CmiFree(f.data->value);
123   free(f.data);
124 }
125
126 void CfutureModuleInit()
127 {
128   CpvInitialize(int, CfutureStoreIndex);
129   CpvAccess(CfutureStoreIndex) = CmiRegisterHandler((CmiHandler)CfutureStore);
130 }