fixed a bug in pxshm
[charm.git] / src / arch / util / persist-comm.c
1 /** @file
2  * Support for persistent communication setup
3  * @ingroup Machine
4  */
5
6 /**
7  * \addtogroup Machine
8 */
9 /*@{*/
10
11 #include "converse.h"
12
13 #if CMK_PERSISTENT_COMM
14
15 #include "gni_pub.h"
16 #include "machine-persistent.h"
17
18 #define TABLESIZE  512
19 PersistentSendsTable persistentSendsTable[TABLESIZE];
20 int persistentSendsTableCount = 0;
21 PersistentReceivesTable *persistentReceivesTableHead;
22 PersistentReceivesTable *persistentReceivesTableTail;
23 int persistentReceivesTableCount = 0;
24
25 /* Converse message type */
26 typedef struct _PersistentRequestMsg {
27   char core[CmiMsgHeaderSizeBytes];
28   int requestorPE;
29   int maxBytes;
30   PersistentHandle sourceHandlerIndex;
31 } PersistentRequestMsg;
32
33 typedef struct _PersistentReqGrantedMsg {
34   char core[CmiMsgHeaderSizeBytes];
35 /*
36   void *msgAddr[PERSIST_BUFFERS_NUM];
37   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
38 */
39   PersistentBuf    buf[PERSIST_BUFFERS_NUM];
40   PersistentHandle sourceHandlerIndex;
41   PersistentHandle destHandlerIndex;
42 } PersistentReqGrantedMsg;
43
44 typedef struct _PersistentDestoryMsg {
45   char core[CmiMsgHeaderSizeBytes];
46   PersistentHandle destHandlerIndex;
47 } PersistentDestoryMsg;
48
49 /* Converse handler */
50 int persistentRequestHandlerIdx;
51 int persistentReqGrantedHandlerIdx;
52 int persistentDestoryHandlerIdx;
53
54 PersistentHandle  *phs = NULL;
55 int phsSize;
56
57 /******************************************************************************
58      Utilities
59 ******************************************************************************/
60
61 void initSendSlot(PersistentSendsTable *slot)
62 {
63   int i;
64   slot->used = 0;
65   slot->destPE = -1;
66   slot->sizeMax = 0;
67   slot->destHandle = 0; 
68 #if 0
69   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
70     slot->destAddress[i] = NULL;
71     slot->destSizeAddress[i] = NULL;
72   }
73 #endif
74   memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
75   slot->messageBuf = 0;
76   slot->messageSize = 0;
77 }
78
79 void swapSendSlotBuffers(PersistentSendsTable *slot)
80 {
81   if (PERSIST_BUFFERS_NUM == 2) {
82 #if 0
83   void *tmp = slot->destAddress[0];
84   slot->destAddress[0] = slot->destAddress[1];
85   slot->destAddress[1] = tmp;
86   tmp = slot->destSizeAddress[0];
87   slot->destSizeAddress[0] = slot->destSizeAddress[1];
88   slot->destSizeAddress[1] = tmp;
89 #else
90   PersistentBuf tmp = slot->destBuf[0];
91   slot->destBuf[0] = slot->destBuf[1];
92   slot->destBuf[1] = tmp;
93 #endif
94   }
95 }
96
97 void initRecvSlot(PersistentReceivesTable *slot)
98 {
99   int i;
100 #if 0
101   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
102     slot->messagePtr[i] = NULL;
103     slot->recvSizePtr[i] = NULL;
104   }
105 #endif
106   memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
107   slot->sizeMax = 0;
108   slot->prev = slot->next = NULL;
109 }
110
111 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
112 {
113   if (PERSIST_BUFFERS_NUM == 2) {
114 #if 0
115   void *tmp = slot->messagePtr[0];
116   slot->messagePtr[0] = slot->messagePtr[1];
117   slot->messagePtr[1] = tmp;
118   tmp = slot->recvSizePtr[0];
119   slot->recvSizePtr[0] = slot->recvSizePtr[1];
120   slot->recvSizePtr[1] = tmp;
121 #else
122   PersistentBuf tmp = slot->destBuf[0];
123   slot->destBuf[0] = slot->destBuf[1];
124   slot->destBuf[1] = tmp;
125 #endif
126   }
127 }
128
129 PersistentHandle getFreeSendSlot()
130 {
131   int i;
132   if (persistentSendsTableCount == TABLESIZE) CmiAbort("persistentSendsTable full.\n");
133   persistentSendsTableCount++;
134   for (i=1; i<TABLESIZE; i++)
135     if (persistentSendsTable[i].used == 0) break;
136   return &persistentSendsTable[i];
137 }
138
139 PersistentHandle getFreeRecvSlot()
140 {
141   PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
142   initRecvSlot(slot);
143   if (persistentReceivesTableHead == NULL) {
144     persistentReceivesTableHead = persistentReceivesTableTail = slot;
145   }
146   else {
147     persistentReceivesTableTail->next = slot;
148     slot->prev = persistentReceivesTableTail;
149     persistentReceivesTableTail = slot;
150   }
151   persistentReceivesTableCount++;
152   return slot;
153 }
154
155 /******************************************************************************
156      Create Persistent Comm handler
157      When creating a persistent comm with destPE and maxSize
158      1. allocate a free PersistentSendsTable entry, send a 
159         PersistentRequestMsg message to destPE
160         buffer persistent message before  Persistent Comm is setup;
161      2. destPE execute Converse handler persistentRequestHandler() on the
162         PersistentRequestMsg message:
163         allocate a free PersistentReceivesTable entry;
164         allocate a message buffer of size maxSize for the communication;
165         Send back a PersistentReqGrantedMsg with message address, etc for
166         elan_put;
167      3. Converse handler persistentReqGrantedHandler() executed on
168         sender for the PersistentReqGrantedMsg. setup finish, send buffered
169         message.
170 ******************************************************************************/
171
172 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
173 {
174   PersistentHandle h = getFreeSendSlot();
175
176   PersistentSendsTable *slot = (PersistentSendsTable *)h;
177
178   if (CmiMyPe() == destPE) {
179     CmiAbort("CmiCreatePersistent Error: setting up persistent communication to the same processor is not allowed.");
180   }
181
182   slot->used = 1;
183   slot->destPE = destPE;
184   slot->sizeMax = maxBytes;
185
186   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
187   msg->maxBytes = maxBytes;
188   msg->sourceHandlerIndex = h;
189   msg->requestorPE = CmiMyPe();
190
191   CmiSetHandler(msg, persistentRequestHandlerIdx);
192   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
193
194   return h;
195 }
196
197 static void persistentRequestHandler(void *env)
198 {             
199   PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
200   char *buf;
201   int i;
202
203   PersistentHandle h = getFreeRecvSlot();
204   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
205   /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
206
207   /* build reply message */
208   PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
209
210   setupRecvSlot(slot, msg->maxBytes);
211
212   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
213 #if 0
214     gmsg->msgAddr[i] = slot->messagePtr[i];
215     gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
216 #else
217     gmsg->buf[i] = slot->destBuf[i];
218 #endif
219   }
220
221   gmsg->sourceHandlerIndex = msg->sourceHandlerIndex;
222   gmsg->destHandlerIndex = h;
223
224   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
225   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
226
227   CmiFree(msg);
228 }
229
230 static void persistentReqGrantedHandler(void *env)
231 {
232   int i;
233   /*CmiPrintf("Persistent handler granted\n");*/
234   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
235   PersistentHandle h = msg->sourceHandlerIndex;
236   PersistentSendsTable *slot = (PersistentSendsTable *)h;
237   CmiAssert(slot->used == 1);
238   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
239 #if 0
240     slot->destAddress[i] = msg->msgAddr[i];
241     slot->destSizeAddress[i] = msg->slotFlagAddress[i];
242 #else
243     slot->destBuf[i] = msg->buf[i];
244 #endif
245   }
246   slot->destHandle = msg->destHandlerIndex;
247
248   if (slot->messageBuf) {
249     LrtsSendPersistentMsg(h, slot->destPE, slot->messageSize, slot->messageBuf);
250     slot->messageBuf = NULL;
251   }
252   CmiFree(msg);
253 }
254
255 /*
256   Another API:
257   receiver initiate the persistent communication
258 */
259 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
260 {
261   PersistentReq ret;
262   int i;
263
264   PersistentHandle h = getFreeRecvSlot();
265   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
266
267   setupRecvSlot(slot, maxBytes);
268
269   ret.pe = CmiMyPe();
270   ret.maxBytes = maxBytes;
271   ret.myHand = h;
272   ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
273   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
274 #if 0
275     ret.messagePtr[i] = slot->messagePtr[i];
276     ret.recvSizePtr[i] = slot->recvSizePtr[i];
277 #else
278     ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
279     memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
280 #endif
281   }
282
283   return ret;
284 }
285
286 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
287 {
288   int i;
289   PersistentHandle h = getFreeSendSlot();
290
291   PersistentSendsTable *slot = (PersistentSendsTable *)h;
292   slot->used = 1;
293   slot->destPE = recvHand.pe;
294   slot->sizeMax = recvHand.maxBytes;
295
296 #if 0
297   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
298     slot->destAddress[i] = recvHand.messagePtr[i];
299     slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
300   }
301 #else
302   memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
303 #endif
304   slot->destHandle = recvHand.myHand;
305   return h;
306 }
307
308 /******************************************************************************
309      destory Persistent Comm handler
310 ******************************************************************************/
311
312 /* Converse Handler */
313 void persistentDestoryHandler(void *env)
314 {             
315   int i;
316   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
317   PersistentHandle h = msg->destHandlerIndex;
318   CmiAssert(h!=NULL);
319   CmiFree(msg);
320   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
321
322   persistentReceivesTableCount --;
323   if (slot->prev) {
324     slot->prev->next = slot->next;
325   }
326   else
327    persistentReceivesTableHead = slot->next;
328   if (slot->next) {
329     slot->next->prev = slot->prev;
330   }
331   else
332     persistentReceivesTableTail = slot->prev;
333
334   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
335     if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
336       PerFree((char*)slot->destBuf[i].destAddress);
337
338   CmiFree(slot);
339 }
340
341 /* FIXME: need to buffer until ReqGranted message come back? */
342 void CmiDestoryPersistent(PersistentHandle h)
343 {
344   if (h == 0) CmiAbort("CmiDestoryPersistent: not a valid PersistentHandle\n");
345
346   PersistentSendsTable *slot = (PersistentSendsTable *)h;
347   CmiAssert(slot->destHandle != 0);
348
349   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
350                               CmiAlloc(sizeof(PersistentDestoryMsg));
351   msg->destHandlerIndex = slot->destHandle;
352
353   CmiSetHandler(msg, persistentDestoryHandlerIdx);
354   CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
355
356   /* free this slot */
357   initSendSlot(slot);
358
359   persistentSendsTableCount --;
360 }
361
362
363 void CmiDestoryAllPersistent()
364 {
365   int i;
366   for (i=0; i<TABLESIZE; i++) {
367     if (persistentSendsTable[i].messageBuf) 
368       CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
369     initSendSlot(&persistentSendsTable[i]);
370   }
371   persistentSendsTableCount = 0;
372
373   PersistentReceivesTable *slot = persistentReceivesTableHead;
374   while (slot) {
375     PersistentReceivesTable *next = slot->next;
376     int i;
377     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
378       if (slot->destBuf[i].destSizeAddress)
379         CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
380       if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
381     }
382     CmiFree(slot);
383     slot = next;
384   }
385   persistentReceivesTableHead = persistentReceivesTableTail = NULL;
386   persistentReceivesTableCount = 0;
387 }
388
389 void CmiPersistentInit()
390 {
391   int i;
392   persistentRequestHandlerIdx = 
393        CmiRegisterHandler((CmiHandler)persistentRequestHandler);
394   persistentReqGrantedHandlerIdx = 
395        CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
396   persistentDestoryHandlerIdx = 
397        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
398
399   persist_machine_init();
400
401   for (i=0; i<TABLESIZE; i++) {
402     initSendSlot(&persistentSendsTable[i]);
403   }
404   persistentSendsTableCount = 0;
405   persistentReceivesTableHead = persistentReceivesTableTail = NULL;
406   persistentReceivesTableCount = 0;
407 }
408
409
410 void CmiUsePersistentHandle(PersistentHandle *p, int n)
411 {
412   if (n==1 && *p == NULL) { p = NULL; n = 0; }
413 #if  CMK_ERROR_CHECKING
414   {
415   int i;
416   for (i=0; i<n; i++)
417     if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
418   }
419 #endif
420   phs = p;
421   phsSize = n;
422 }
423
424 #endif
425
426 /*@}*/