Merge branches 'charm' and 'charm-mpi-interop'
[charm.git] / src / conv-core / futures.c
1 #include <stdlib.h>
2 #include <string.h>
3 #include "converse.h"
4
5 typedef struct Cfuture_data_s
6 {
7   void      *value;
8   int        ready;
9   CthThread  waiters;
10 }
11 *futdata;
12
13 typedef struct CfutureValue_s
14 {
15   char core[CmiMsgHeaderSizeBytes];
16   struct Cfuture_data_s *data;
17   int valsize;
18   double rest[1];
19 }
20 *CfutureValue;
21
22 #define field_offset(t, f) ((size_t)(((t)0)->f))
23 #define void_to_value(v) ((CfutureValue)(((char*)v)-field_offset(CfutureValue,rest)))
24
25 CpvDeclare(int, CfutureStoreIndex);
26
27 Cfuture CfutureCreate()
28 {
29   futdata data = (futdata)malloc(sizeof(struct Cfuture_data_s));
30   Cfuture result;
31   _MEMCHECK(data);
32   data->value = 0;
33   data->ready = 0;
34   data->waiters = 0;
35   result.pe = CmiMyPe();
36   result.data = data;
37   return result;
38 }
39
40 static void CfutureAwaken(futdata data, CfutureValue val)
41 {
42   CthThread t;
43   data->value = val;
44   data->ready = 1;
45   for (t=data->waiters; t; t=CthGetNext(t))
46     CthAwaken(t);
47   data->waiters=0;
48 }
49
50 static void CfutureStore(CfutureValue m)
51 {
52   CfutureAwaken(m->data, m);
53 }
54
55 void *CfutureCreateBuffer(int bytes)
56 {
57   int valsize = sizeof(struct CfutureValue_s) + bytes;
58   CfutureValue m = (CfutureValue)CmiAlloc(valsize);
59   CmiSetHandler(m, CpvAccess(CfutureStoreIndex));
60   m->valsize = valsize;
61   return (void*)(m->rest);
62 }
63
64 void CfutureDestroyBuffer(void *v)
65 {
66   CmiFree(v);
67 }
68
69 void CfutureStoreBuffer(Cfuture f, void *value)
70 {
71   CfutureValue m = void_to_value(value);
72   if (f.pe == CmiMyPe()) {
73     CfutureAwaken(f.data, m);
74   } else {
75     m->data = f.data;
76     CmiSyncSendAndFree(f.pe, m->valsize, m);
77   }
78 }
79
80 void CfutureSet(Cfuture f, void *value, int len)
81 {
82   void *copy = CfutureCreateBuffer(len);
83   memcpy(copy, value, len);
84   CfutureStoreBuffer(f, copy);
85 }
86
87 void *CfutureWait(Cfuture f)
88 {
89   CthThread self; CfutureValue value; futdata data;
90   if (f.pe != CmiMyPe()) {
91     CmiPrintf("error: CfutureWait: future not local.\n");
92     exit(1);
93   }
94   data = f.data;
95   if (data->ready == 0) {
96     self = CthSelf();
97     CthSetNext(self, data->waiters);
98     data->waiters = self;
99     CthSuspend();
100   }
101   value = data->value;
102   return (void*)(value->rest);
103 }
104
105 void CfutureDestroy(Cfuture f)
106 {
107   if (f.pe != CmiMyPe()) {
108     CmiPrintf("error: CfutureDestroy: future not local.\n");
109     exit(1);
110   }
111   if (f.data->waiters) {
112     CmiPrintf("error: CfutureDestroy: destroying an active future.\n");
113     exit(1);
114   }
115   if (f.data->value) CmiFree(f.data->value);
116   free(f.data);
117 }
118
119 void CfutureModuleInit()
120 {
121   CpvInitialize(int, CfutureStoreIndex);
122   CpvAccess(CfutureStoreIndex) = CmiRegisterHandler((CmiHandler)CfutureStore);
123 }