fix bug in node level persistent
[charm.git] / src / arch / util / persist-comm.c
1 /** @file
2  * Support for persistent communication setup
3  * @ingroup Machine
4  */
5
6 /**
7  * \addtogroup Machine
8 */
9 /*@{*/
10
11 #include "converse.h"
12
13 #if CMK_PERSISTENT_COMM
14
15 #include "machine-persistent.h"
16
17 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
18 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
19 CpvDeclare(int, persistentSendsTableCount);
20 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
21 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
22 CpvDeclare(int, persistentReceivesTableCount);
23
24 /* Converse message type */
25 typedef struct _PersistentRequestMsg {
26   char core[CmiMsgHeaderSizeBytes];
27   int requestorPE;
28   int maxBytes;
29   PersistentHandle sourceHandler;
30 } PersistentRequestMsg;
31
32 typedef struct _PersistentReqGrantedMsg {
33   char core[CmiMsgHeaderSizeBytes];
34 /*
35   void *msgAddr[PERSIST_BUFFERS_NUM];
36   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
37 */
38   PersistentBuf    buf[PERSIST_BUFFERS_NUM];
39   PersistentHandle sourceHandler;
40   PersistentHandle destHandler;
41 } PersistentReqGrantedMsg;
42
43 typedef struct _PersistentDestoryMsg {
44   char core[CmiMsgHeaderSizeBytes];
45   PersistentHandle destHandlerIndex;
46 } PersistentDestoryMsg;
47
48 /* Converse handler */
49 int persistentRequestHandlerIdx;
50 int persistentReqGrantedHandlerIdx;
51 int persistentDestoryHandlerIdx;
52
53 CpvDeclare(PersistentHandle *, phs);
54 CpvDeclare(int, phsSize);
55 CpvDeclare(int, curphs);
56
57 /******************************************************************************
58      Utilities
59 ******************************************************************************/
60
61 extern void initRecvSlot(PersistentReceivesTable *slot);
62 extern void initSendSlot(PersistentSendsTable *slot);
63
64 void swapSendSlotBuffers(PersistentSendsTable *slot)
65 {
66   if (PERSIST_BUFFERS_NUM == 2) {
67 #if 0
68   void *tmp = slot->destAddress[0];
69   slot->destAddress[0] = slot->destAddress[1];
70   slot->destAddress[1] = tmp;
71   tmp = slot->destSizeAddress[0];
72   slot->destSizeAddress[0] = slot->destSizeAddress[1];
73   slot->destSizeAddress[1] = tmp;
74 #else
75   PersistentBuf tmp = slot->destBuf[0];
76   slot->destBuf[0] = slot->destBuf[1];
77   slot->destBuf[1] = tmp;
78 #endif
79   }
80 }
81
82 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
83 {
84   if (PERSIST_BUFFERS_NUM == 2) {
85 #if 0
86   void *tmp = slot->messagePtr[0];
87   slot->messagePtr[0] = slot->messagePtr[1];
88   slot->messagePtr[1] = tmp;
89   tmp = slot->recvSizePtr[0];
90   slot->recvSizePtr[0] = slot->recvSizePtr[1];
91   slot->recvSizePtr[1] = tmp;
92 #else
93   PersistentBuf tmp = slot->destBuf[0];
94   slot->destBuf[0] = slot->destBuf[1];
95   slot->destBuf[1] = tmp;
96 #endif
97   }
98 }
99
100 PersistentHandle getFreeSendSlot()
101 {
102   PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
103   initSendSlot(slot);
104   if (CpvAccess(persistentSendsTableHead) == NULL) {
105     CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
106   }
107   else {
108     CpvAccess(persistentSendsTableTail)->next = slot;
109     slot->prev = CpvAccess(persistentSendsTableTail);
110     CpvAccess(persistentSendsTableTail) = slot;
111   }
112   CpvAccess(persistentSendsTableCount)++;
113   return slot;
114 }
115
116 PersistentHandle getFreeRecvSlot()
117 {
118   PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
119   initRecvSlot(slot);
120   if (CpvAccess(persistentReceivesTableHead) == NULL) {
121     CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
122   }
123   else {
124     CpvAccess(persistentReceivesTableTail)->next = slot;
125     slot->prev = CpvAccess(persistentReceivesTableTail);
126     CpvAccess(persistentReceivesTableTail) = slot;
127   }
128   CpvAccess(persistentReceivesTableCount)++;
129   return slot;
130 }
131
132 /******************************************************************************
133      Create Persistent Comm handler
134      When creating a persistent comm with destPE and maxSize
135      1. allocate a free PersistentSendsTable entry, send a 
136         PersistentRequestMsg message to destPE
137         buffer persistent message before  Persistent Comm is setup;
138      2. destPE execute Converse handler persistentRequestHandler() on the
139         PersistentRequestMsg message:
140         allocate a free PersistentReceivesTable entry;
141         allocate a message buffer of size maxSize for the communication;
142         Send back a PersistentReqGrantedMsg with message address, etc for
143         elan_put;
144      3. Converse handler persistentReqGrantedHandler() executed on
145         sender for the PersistentReqGrantedMsg. setup finish, send buffered
146         message.
147 ******************************************************************************/
148
149 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
150 {
151   PersistentHandle h;
152   PersistentSendsTable *slot;
153
154   if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
155
156 /*
157   if (CmiMyPe() == destPE) {
158     CmiPrintf("[%d] CmiCreatePersistent Error>  setting up persistent communication to the same processor is not allowed.\n", CmiMyPe());
159     CmiAbort("CmiCreatePersistent");
160   }
161 */
162
163   h = getFreeSendSlot();
164   slot = (PersistentSendsTable *)h;
165
166   slot->destPE = destPE;
167   slot->sizeMax = maxBytes;
168
169   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
170   msg->maxBytes = maxBytes;
171   msg->sourceHandler = h;
172   msg->requestorPE = CmiMyPe();
173
174   CmiSetHandler(msg, persistentRequestHandlerIdx);
175   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
176
177   return h;
178 }
179
180 /* for SMP */
181 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
182 {
183     /* randomly pick one rank on the destination node is fine for setup.
184        actual message will be handled by comm thread anyway */
185   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
186   return CmiCreatePersistent(pe, maxBytes);
187 }
188
189 static void persistentRequestHandler(void *env)
190 {             
191   PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
192   char *buf;
193   int i;
194
195   PersistentHandle h = getFreeRecvSlot();
196   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
197   /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
198
199   /* build reply message */
200   PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
201
202   setupRecvSlot(slot, msg->maxBytes);
203
204   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
205 #if 0
206     gmsg->msgAddr[i] = slot->messagePtr[i];
207     gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
208 #else
209     gmsg->buf[i] = slot->destBuf[i];
210 #endif
211   }
212
213   gmsg->sourceHandler = msg->sourceHandler;
214   gmsg->destHandler = getPersistentHandle(h, 1);
215
216   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
217   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
218
219   CmiFree(msg);
220 }
221
222 static void persistentReqGrantedHandler(void *env)
223 {
224   int i;
225
226   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
227   PersistentHandle h = msg->sourceHandler;
228   PersistentSendsTable *slot = (PersistentSendsTable *)h;
229
230   /* CmiPrintf("[%d] Persistent handler granted  h:%p\n", CmiMyPe(), h); */
231
232   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
233 #if 0
234     slot->destAddress[i] = msg->msgAddr[i];
235     slot->destSizeAddress[i] = msg->slotFlagAddress[i];
236 #else
237     slot->destBuf[i] = msg->buf[i];
238 #endif
239   }
240   slot->destHandle = msg->destHandler;
241
242   if (slot->messageBuf) {
243     LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
244     slot->messageBuf = NULL;
245   }
246   CmiFree(msg);
247 }
248
249 /*
250   Another API:
251   receiver initiate the persistent communication
252 */
253 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
254 {
255   PersistentReq ret;
256   int i;
257
258   PersistentHandle h = getFreeRecvSlot();
259   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
260
261   setupRecvSlot(slot, maxBytes);
262
263   ret.pe = CmiMyPe();
264   ret.maxBytes = maxBytes;
265   ret.myHand = h;
266   ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
267   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
268 #if 0
269     ret.messagePtr[i] = slot->messagePtr[i];
270     ret.recvSizePtr[i] = slot->recvSizePtr[i];
271 #else
272     ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
273     memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
274 #endif
275   }
276
277   return ret;
278 }
279
280 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
281 {
282   int i;
283   PersistentHandle h = getFreeSendSlot();
284
285   PersistentSendsTable *slot = (PersistentSendsTable *)h;
286   slot->destPE = recvHand.pe;
287   slot->sizeMax = recvHand.maxBytes;
288
289 #if 0
290   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
291     slot->destAddress[i] = recvHand.messagePtr[i];
292     slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
293   }
294 #else
295   memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
296 #endif
297   slot->destHandle = recvHand.myHand;
298   return h;
299 }
300
301 /******************************************************************************
302      destory Persistent Comm handler
303 ******************************************************************************/
304
305 /* Converse Handler */
306 void persistentDestoryHandler(void *env)
307 {             
308   int i;
309   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
310   PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
311   CmiAssert(h!=NULL);
312   CmiFree(msg);
313   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
314
315   CpvAccess(persistentReceivesTableCount) --;
316   if (slot->prev) {
317     slot->prev->next = slot->next;
318   }
319   else
320     CpvAccess(persistentReceivesTableHead) = slot->next;
321   if (slot->next) {
322     slot->next->prev = slot->prev;
323   }
324   else
325     CpvAccess(persistentReceivesTableTail) = slot->prev;
326
327   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
328     if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
329       PerFree((char*)slot->destBuf[i].destAddress);
330
331   clearRecvSlot(slot);
332
333   free(slot);
334 }
335
336 /* FIXME: need to buffer until ReqGranted message come back? */
337 void CmiDestoryPersistent(PersistentHandle h)
338 {
339   if (h == NULL) return;
340
341   PersistentSendsTable *slot = (PersistentSendsTable *)h;
342   /* CmiAssert(slot->destHandle != 0); */
343
344   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
345                               CmiAlloc(sizeof(PersistentDestoryMsg));
346   msg->destHandlerIndex = slot->destHandle;
347
348   CmiSetHandler(msg, persistentDestoryHandlerIdx);
349   CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
350
351   /* free this slot */
352   if (slot->prev) {
353     slot->prev->next = slot->next;
354   }
355   else
356     CpvAccess(persistentSendsTableHead) = slot->next;
357   if (slot->next) {
358     slot->next->prev = slot->prev;
359   }
360   else
361     CpvAccess(persistentSendsTableTail) = slot->prev;
362   free(slot);
363
364   CpvAccess(persistentSendsTableCount) --;
365 }
366
367
368 void CmiDestoryAllPersistent()
369 {
370   PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
371   while (sendslot) {
372     PersistentSendsTable *next = sendslot->next;
373     free(sendslot);
374     sendslot = next;
375   }
376   CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
377   CpvAccess(persistentSendsTableCount) = 0;
378
379   PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
380   while (slot) {
381     PersistentReceivesTable *next = slot->next;
382     int i;
383     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
384       if (slot->destBuf[i].destSizeAddress)
385         CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
386       if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
387     }
388     free(slot);
389     slot = next;
390   }
391   CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
392   CpvAccess(persistentReceivesTableCount) = 0;
393 }
394
395 void CmiPersistentInit()
396 {
397   int i;
398
399   persistentRequestHandlerIdx = 
400        CmiRegisterHandler((CmiHandler)persistentRequestHandler);
401   persistentReqGrantedHandlerIdx = 
402        CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
403   persistentDestoryHandlerIdx = 
404        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
405
406  
407   CpvInitialize(PersistentHandle*, phs);
408   CpvAccess(phs) = NULL;
409   CpvInitialize(int, phsSize);
410   CpvInitialize(int, curphs);
411   CpvAccess(curphs) = 0;
412
413   persist_machine_init();
414
415   CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
416   CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
417   CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
418   CpvInitialize(int, persistentSendsTableCount);
419   CpvAccess(persistentSendsTableCount) = 0;
420
421   CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
422   CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
423   CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
424   CpvInitialize(int, persistentReceivesTableCount);
425   CpvAccess(persistentReceivesTableCount) = 0;
426 }
427
428
429 void CmiUsePersistentHandle(PersistentHandle *p, int n)
430 {
431   if (n==1 && *p == NULL) { p = NULL; n = 0; }
432 #if  CMK_ERROR_CHECKING && 0
433   {
434   int i;
435   for (i=0; i<n; i++)
436     if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
437   }
438 #endif
439   CpvAccess(phs) = p;
440   CpvAccess(phsSize) = n;
441   CpvAccess(curphs) = 0;
442 }
443
444 void CmiPersistentOneSend()
445 {
446   if (CpvAccess(phs)) CpvAccess(curphs)++;
447 }
448
449 #endif
450
451 /*@}*/