move to common gemini arch directory
[charm.git] / src / arch / elan / persistent.c
1 /** @file
2  * Elan persistent communication
3  * @ingroup Machine
4 */
5
6 /*
7   included in machine.c
8   common code for persistent communication now is moved to persist-comm.c
9   Gengbin Zheng, 12/5/2003
10 */
11
12 /*
13   machine specific persistent comm functions:
14   * CmiSendPersistentMsg
15   * CmiSyncSendPersistent
16   * PumpPersistent
17   * PerAlloc PerFree      // persistent message memory allocation/free functions
18   * persist_machine_init  // machine specific initialization call
19 */
20
21 #define STRATEGY_ONE_ELANPUT   0
22 #define STRATEGY_TWO_ELANPUT   1
23
24 /****************************************************************************
25        Pending send messages
26 ****************************************************************************/
27 typedef struct pmsg_list {
28   ELAN_EVENT *e;
29   char *msg;
30   struct pmsg_list *next;
31   int size, destpe;
32   void *addr;
33   PersistentHandle  h;
34   int strategy, phase;
35 } PMSG_LIST;
36
37 static PMSG_LIST *pending_persistent_msgs = NULL;
38 static PMSG_LIST *end_pending_persistent_msgs = NULL;
39 static PMSG_LIST *free_list_head = NULL;
40
41 /* free_list_head keeps a list of reusable PMSG_LIST */
42 #define NEW_PMSG_LIST(evt, m, s, dest, _addr, ph, stra) \
43   if (free_list_head) { msg_tmp = free_list_head; free_list_head=free_list_head->next; }  \
44   else msg_tmp = (PMSG_LIST *) CmiAlloc(sizeof(PMSG_LIST));     \
45   msg_tmp->msg = m;     \
46   msg_tmp->e = evt;     \
47   msg_tmp->size = s;    \
48   msg_tmp->next = NULL; \
49   msg_tmp->destpe = dest;       \
50   msg_tmp->addr = _addr;        \
51   msg_tmp->h = ph;              \
52   msg_tmp->phase = 0;   \
53   msg_tmp->strategy = stra;     
54
55 #define APPEND_PMSG_LIST(msg_tmp)       \
56   if (pending_persistent_msgs==0)       \
57     pending_persistent_msgs = msg_tmp;  \
58   else  \
59     end_pending_persistent_msgs->next = msg_tmp;        \
60   end_pending_persistent_msgs = msg_tmp;
61
62
63 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
64 {
65   CmiAssert(h!=NULL);
66   PersistentSendsTable *slot = (PersistentSendsTable *)h;
67   CmiAssert(slot->used == 1);
68   CmiAssert(slot->destPE == destPE);
69   if (size > slot->sizeMax) {
70     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
71     CmiAbort("Abort: Invalid size\n");
72   }
73
74 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), destPE, slot->destAddress[0], size);*/
75
76   if (slot->destAddress[0]) {
77     ELAN_EVENT *e1, *e2;
78     int strategy = STRATEGY_ONE_ELANPUT;
79     /* if (size > 280) strategy = STRATEGY_TWO_ELANPUT; */
80     int *footer = (int*)((char*)m + size);
81     footer[0] = size;
82     footer[1] = 1;
83     if (strategy == STRATEGY_ONE_ELANPUT) CMI_MESSAGE_SIZE(m) = size;
84     else CMI_MESSAGE_SIZE(m) = 0;
85     e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
86     switch (strategy ) {
87     case STRATEGY_ONE_ELANPUT:
88     case STRATEGY_TWO_ELANPUT:  {
89       PMSG_LIST *msg_tmp;
90       NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
91       APPEND_PMSG_LIST(msg_tmp);
92       swapSendSlotBuffers(slot);
93       break;
94       }
95     case 2:
96       elan_wait(e1, ELAN_POLL_EVENT);
97       e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
98       elan_wait(e2, ELAN_POLL_EVENT);
99       CMI_MESSAGE_SIZE(m) = 0;
100       /*CmiPrintf("[%d] elan finished. \n", CmiMyPe());*/
101       CmiFree(m);
102     }
103   }
104   else {
105 #if 1
106     if (slot->messageBuf != NULL) {
107       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
108       CmiAbort("");
109     }
110     slot->messageBuf = m;
111     slot->messageSize = size;
112 #else
113     /* normal send */
114     PersistentHandle  *phs_tmp = phs;
115     int phsSize_tmp = phsSize;
116     phs = NULL; phsSize = 0;
117     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
118     CmiSyncSendAndFree(slot->destPE, size, m);
119     phs = phs_tmp; phsSize = phsSize_tmp;
120 #endif
121   }
122 }
123
124 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
125 {
126   CmiState cs = CmiGetState();
127   char *dupmsg = (char *) CmiAlloc(size);
128   memcpy(dupmsg, msg, size);
129
130   /*  CmiPrintf("Setting root to %d\n", 0); */
131   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
132
133   if (cs->pe==destPE) {
134     CQdCreate(CpvAccess(cQdState), 1);
135     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
136   }
137   else
138     CmiSendPersistentMsg(h, destPE, size, dupmsg);
139 }
140
141 /* 
142   1: finish the first put but still need to be in the queue for the second put.
143   2: finish and should be removed from queue.
144 */
145 static int remote_put_done(PMSG_LIST *smsg)
146 {
147   int flag = elan_poll(smsg->e, ELAN_POLL_EVENT);
148   if (flag) {
149       switch (smsg->strategy) {
150       case 0: 
151         smsg->phase = 1;
152         CmiFree(smsg->msg);
153         return 2;
154       case 1:
155         if (smsg->phase == 1) {
156           /*
157             CmiPrintf("remote_put_done on %d\n", CmiMyPe());
158           */
159           return 2;
160         }
161         else {
162           smsg->phase = 1;
163           CmiFree(smsg->msg);
164                                                                                 
165           PersistentSendsTable *slot = (PersistentSendsTable *)(smsg->h);
166           smsg->e = elan_put(elan_base->state, &smsg->size, smsg->addr, sizeof(int), smsg->destpe);
167           return 1;
168         }
169      }
170   }
171   return 0;
172 }
173
174 /* called in CmiReleaseSentMessages */
175 void release_pmsg_list()
176 {
177   PMSG_LIST *prev=0, *temp;
178   PMSG_LIST *msg_tmp = pending_persistent_msgs;
179
180   while (msg_tmp) {
181     int status = remote_put_done(msg_tmp);
182     if (status == 2) {
183       temp = msg_tmp->next;
184       if (prev==0)
185         pending_persistent_msgs = temp;
186       else
187         prev->next = temp;
188       /*CmiFree(msg_tmp);*/
189       if (free_list_head) { msg_tmp->next = free_list_head; free_list_head = msg_tmp; }
190       else free_list_head = msg_tmp;
191       msg_tmp = temp;
192     }
193     else {
194       prev = msg_tmp;
195       msg_tmp = msg_tmp->next;
196     }
197   }
198   end_pending_persistent_msgs = prev;
199 }
200
201 extern void CmiReference(void *blk);
202
203 /* called in PumpMsgs */
204 int PumpPersistent()
205 {
206   int status = 0;
207   PersistentReceivesTable *slot = persistentReceivesTableHead;
208   while (slot) {
209     char *msg = slot->messagePtr[0];
210     int size = *(slot->recvSizePtr[0]);
211     if (size)
212     {
213       int *footer = (int*)(msg + size);
214       if (footer[0] == size && footer[1] == 1) {
215 /*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
216
217 #if 0
218       void *dupmsg;
219       dupmsg = CmiAlloc(size);
220                                                                                 
221       _MEMCHECK(dupmsg);
222       memcpy(dupmsg, msg, size);
223       memset(msg, 0, size+2*sizeof(int));
224       msg = dupmsg;
225 #else
226       /* return messagePtr directly and user MUST make sure not to delete it. */
227       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
228
229       CmiReference(msg);
230       swapRecvSlotBuffers(slot);
231 #endif
232
233       CmiPushPE(CMI_DEST_RANK(msg), msg);
234 #if CMK_BROADCAST_SPANNING_TREE
235       if (CMI_BROADCAST_ROOT(msg))
236           SendSpanningChildren(size, msg);
237 #endif
238       /* clear footer after message used */
239       *(slot->recvSizePtr[0]) = 0;
240       footer[0] = footer[1] = 0;
241
242 #if 0
243       /* not safe at all! */
244       /* instead of clear before use, do it earlier */
245       msg=slot->messagePtr[0];
246       size = *(slot->recvSizePtr[0]);
247       footer = (int*)(msg + size);
248       *(slot->recvSizePtr[0]) = 0;
249       footer[0] = footer[1] = 0;
250 #endif
251       status = 1;
252       }
253     }
254     slot = slot->next;
255   }
256   return status;
257 }
258
259 void *PerAlloc(int size)
260 {
261   return CmiAlloc(size);
262 }
263                                                                                 
264 void PerFree(char *msg)
265 {
266   elan_CmiStaticFree(msg);
267 }
268
269 /* machine dependent init call */
270 void persist_machine_init(void)
271 {
272 }
273
274 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
275 {
276   int i;
277   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
278     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
279     _MEMCHECK(buf);
280     memset(buf, 0, maxBytes+sizeof(int)*2);
281     slot->messagePtr[i] = buf;
282     /* note: assume first integer in elan converse header is the msg size */
283     slot->recvSizePtr[i] = (unsigned int*)buf;
284   }
285   slot->sizeMax = maxBytes;
286 }
287
288