fixed a bug in pxshm
[charm.git] / src / arch / gemini_gni / machine-persistent.c
1 /** @file
2  * Elan persistent communication
3  * @ingroup Machine
4 */
5
6 /*
7   included in machine.c
8   Gengbin Zheng, 9/6/2011
9 */
10
11 /*
12   machine specific persistent comm functions:
13   * LrtsSendPersistentMsg
14   * CmiSyncSendPersistent
15   * PumpPersistent
16   * PerAlloc PerFree      // persistent message memory allocation/free functions
17   * persist_machine_init  // machine specific initialization call
18 */
19
20
21 void LrtsSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
22 {
23   CmiAssert(h!=NULL);
24   PersistentSendsTable *slot = (PersistentSendsTable *)h;
25   CmiAssert(slot->used == 1);
26   CmiAssert(slot->destPE == destPE);
27   if (size > slot->sizeMax) {
28     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
29     CmiAbort("Abort: Invalid size\n");
30   }
31
32 /*CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), destPE, slot->destAddress[0], size);*/
33
34   if (slot->destBuf[0].destAddress) {
35 #if 0
36     ELAN_EVENT *e1, *e2;
37     int strategy = STRATEGY_ONE_PUT;
38     /* if (size > 280) strategy = STRATEGY_TWO_ELANPUT; */
39     int *footer = (int*)((char*)m + size);
40     footer[0] = size;
41     footer[1] = 1;
42     if (strategy == STRATEGY_ONE_PUT) CMI_MESSAGE_SIZE(m) = size;
43     else CMI_MESSAGE_SIZE(m) = 0;
44     e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
45     switch (strategy ) {
46     case STRATEGY_ONE_PUT:
47     case STRATEGY_TWO_PUT:  {
48       PMSG_LIST *msg_tmp;
49       NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
50       APPEND_PMSG_LIST(msg_tmp);
51       swapSendSlotBuffers(slot);
52       break;
53       }
54     case 2:
55       elan_wait(e1, ELAN_POLL_EVENT);
56       e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
57       elan_wait(e2, ELAN_POLL_EVENT);
58       CMI_MESSAGE_SIZE(m) = 0;
59       /*CmiPrintf("[%d] elan finished. \n", CmiMyPe());*/
60       CmiFree(m);
61     }
62 #else
63      // uGNI part
64 #endif
65   }
66   else {
67 #if 1
68     if (slot->messageBuf != NULL) {
69       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
70       CmiAbort("");
71     }
72     slot->messageBuf = m;
73     slot->messageSize = size;
74 #else
75     /* normal send */
76     PersistentHandle  *phs_tmp = phs;
77     int phsSize_tmp = phsSize;
78     phs = NULL; phsSize = 0;
79     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
80     CmiSyncSendAndFree(slot->destPE, size, m);
81     phs = phs_tmp; phsSize = phsSize_tmp;
82 #endif
83   }
84 }
85
86 #if 0
87 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
88 {
89   CmiState cs = CmiGetState();
90   char *dupmsg = (char *) CmiAlloc(size);
91   memcpy(dupmsg, msg, size);
92
93   /*  CmiPrintf("Setting root to %d\n", 0); */
94   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
95
96   if (cs->pe==destPE) {
97     CQdCreate(CpvAccess(cQdState), 1);
98     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
99   }
100   else
101     LrtsSendPersistentMsg(h, destPE, size, dupmsg);
102 }
103 #endif
104
105 extern void CmiReference(void *blk);
106
107 #if 0
108
109 /* called in PumpMsgs */
110 int PumpPersistent()
111 {
112   int status = 0;
113   PersistentReceivesTable *slot = persistentReceivesTableHead;
114   while (slot) {
115     char *msg = slot->messagePtr[0];
116     int size = *(slot->recvSizePtr[0]);
117     if (size)
118     {
119       int *footer = (int*)(msg + size);
120       if (footer[0] == size && footer[1] == 1) {
121 /*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
122
123 #if 0
124       void *dupmsg;
125       dupmsg = CmiAlloc(size);
126                                                                                 
127       _MEMCHECK(dupmsg);
128       memcpy(dupmsg, msg, size);
129       memset(msg, 0, size+2*sizeof(int));
130       msg = dupmsg;
131 #else
132       /* return messagePtr directly and user MUST make sure not to delete it. */
133       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
134
135       CmiReference(msg);
136       swapRecvSlotBuffers(slot);
137 #endif
138
139       CmiPushPE(CMI_DEST_RANK(msg), msg);
140 #if CMK_BROADCAST_SPANNING_TREE
141       if (CMI_BROADCAST_ROOT(msg))
142           SendSpanningChildren(size, msg);
143 #endif
144       /* clear footer after message used */
145       *(slot->recvSizePtr[0]) = 0;
146       footer[0] = footer[1] = 0;
147
148 #if 0
149       /* not safe at all! */
150       /* instead of clear before use, do it earlier */
151       msg=slot->messagePtr[0];
152       size = *(slot->recvSizePtr[0]);
153       footer = (int*)(msg + size);
154       *(slot->recvSizePtr[0]) = 0;
155       footer[0] = footer[1] = 0;
156 #endif
157       status = 1;
158       }
159     }
160     slot = slot->next;
161   }
162   return status;
163 }
164
165 #endif
166
167 void *PerAlloc(int size)
168 {
169   return CmiAlloc(size);
170 }
171                                                                                 
172 void PerFree(char *msg)
173 {
174   //elan_CmiStaticFree(msg);
175   CmiFree(msg);
176 }
177
178 /* machine dependent init call */
179 void persist_machine_init(void)
180 {
181 }
182
183 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
184 {
185   int i;
186   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
187     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
188     _MEMCHECK(buf);
189     memset(buf, 0, maxBytes+sizeof(int)*2);
190     slot->destBuf[i].destAddress = buf;
191     /* note: assume first integer in elan converse header is the msg size */
192     slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
193     // slot->destBuf[i].mem_hdl = 0;
194   }
195   slot->sizeMax = maxBytes;
196 }
197
198