Added nice header using CVS keywords for *.[cCh] files.
[charm.git] / src / conv-core / futures.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include "converse.h"
9
10 typedef struct Cfuture_data_s
11 {
12   void      *value;
13   int        ready;
14   CthThread  waiters;
15 }
16 *futdata;
17
18 typedef struct CfutureValue_s
19 {
20   char core[CmiMsgHeaderSizeBytes];
21   struct Cfuture_data_s *data;
22   int valsize;
23   double rest[1];
24 }
25 *CfutureValue;
26
27 #define field_offset(t, f) ((CMK_SIZE_T)(((t)0)->f))
28 #define void_to_value(v) ((CfutureValue)(((char*)v)-field_offset(CfutureValue,rest)))
29
30 CpvDeclare(int, CfutureStoreIndex);
31
32 Cfuture CfutureCreate()
33 {
34   futdata data = (futdata)malloc(sizeof(struct Cfuture_data_s));
35   Cfuture result;
36   _MEMCHECK(data);
37   data->value = 0;
38   data->ready = 0;
39   data->waiters = 0;
40   result.pe = CmiMyPe();
41   result.data = data;
42   return result;
43 }
44
45 static void CfutureAwaken(futdata data, CfutureValue val)
46 {
47   CthThread t;
48   data->value = val;
49   data->ready = 1;
50   for (t=data->waiters; t; t=CthGetNext(t))
51     CthAwaken(t);
52   data->waiters=0;
53 }
54
55 static void CfutureStore(CfutureValue m)
56 {
57   CmiGrabBuffer((void **)&m);
58   CfutureAwaken(m->data, m);
59 }
60
61 void *CfutureCreateBuffer(int bytes)
62 {
63   int valsize = sizeof(struct CfutureValue_s) + bytes;
64   CfutureValue m = (CfutureValue)CmiAlloc(valsize);
65   CmiSetHandler(m, CpvAccess(CfutureStoreIndex));
66   m->valsize = valsize;
67   return (void*)(m->rest);
68 }
69
70 void CfutureDestroyBuffer(void *v)
71 {
72   CmiFree(v);
73 }
74
75 void CfutureStoreBuffer(Cfuture f, void *value)
76 {
77   CfutureValue m = void_to_value(value);
78   if (f.pe == CmiMyPe()) {
79     CfutureAwaken(f.data, m);
80   } else {
81     m->data = f.data;
82     CmiSyncSendAndFree(f.pe, m->valsize, m);
83   }
84 }
85
86 void CfutureSet(Cfuture f, void *value, int len)
87 {
88   void *copy = CfutureCreateBuffer(len);
89   memcpy(copy, value, len);
90   CfutureStoreBuffer(f, copy);
91 }
92
93 void *CfutureWait(Cfuture f)
94 {
95   CthThread self; CfutureValue value; futdata data;
96   if (f.pe != CmiMyPe()) {
97     CmiPrintf("error: CfutureWait: future not local.\n");
98     exit(1);
99   }
100   data = f.data;
101   if (data->ready == 0) {
102     self = CthSelf();
103     CthSetNext(self, data->waiters);
104     data->waiters = self;
105     CthSuspend();
106   }
107   value = data->value;
108   return (void*)(value->rest);
109 }
110
111 void CfutureDestroy(Cfuture f)
112 {
113   if (f.pe != CmiMyPe()) {
114     CmiPrintf("error: CfutureDestroy: future not local.\n");
115     exit(1);
116   }
117   if (f.data->waiters) {
118     CmiPrintf("error: CfutureDestroy: destroying an active future.\n");
119     exit(1);
120   }
121   if (f.data->value) CmiFree(f.data->value);
122   free(f.data);
123 }
124
125 void CfutureModuleInit()
126 {
127   CpvInitialize(int, CfutureStoreIndex);
128   CpvAccess(CfutureStoreIndex) = CmiRegisterHandler(CfutureStore);
129 }