Wrong variable (not seen out of SMP builds)
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 #include "ckhashtable.h"
28
29 /* Includes all information stored about a single CCS handler. */
30 typedef struct CcsHandlerRec {
31         const char *name; /*Name passed over socket*/
32         CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
33         CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
34         void *userPtr;
35         CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
36         int nCalls; /* Number of times handler has been executed*/
37         CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
38 } CcsHandlerRec;
39
40 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
41   if (strlen(name)>=CCS_MAXHANDLER) 
42         CmiAbort("CCS handler names cannot exceed 32 characters");
43   c->name=strdup(name);
44   c->fn=NULL;
45   c->fnOld=NULL;
46   c->userPtr=NULL;
47   c->mergeFn=NULL;
48   c->nCalls=0;
49 }
50
51 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
52         c->nCalls++;
53         if (c->fnOld) 
54         { /* Backward compatability version:
55             Pack user data into a converse message (cripes! why bother?);
56             user will delete the message. 
57           */
58                 char *cmsg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes+reqLen);
59                 memcpy(cmsg+CmiMsgHeaderSizeBytes, reqData, reqLen);
60                 (c->fnOld)(cmsg);
61         }
62         else { /* Pass read-only copy of data straight to user */
63                 (c->fn)(c->userPtr, reqLen, reqData);
64         }
65 }
66
67 /*Table maps handler name to CcsHandler object*/
68 typedef CkHashtable_c CcsHandlerTable;
69 CpvStaticDeclare(CcsHandlerTable, ccsTab);
70
71 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
72
73 void CcsRegisterHandler(const char *name, CmiHandler fn) {
74   CcsHandlerRec cp;
75   initHandlerRec(&cp,name);
76   cp.fnOld=fn;
77   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
78 }
79 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
80   CcsHandlerRec cp;
81   initHandlerRec(&cp,name);
82   cp.fn=fn;
83   cp.userPtr=ptr;
84   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
85 }
86 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
87   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
88   if (rec==NULL) {
89     CmiAbort("CCS: Unknown CCS handler name.\n");
90   }
91   rec->mergeFn=newMerge;
92   rec->redID=CmiGetGlobalReduction();
93 }
94
95 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
96   CcsImplHeader *hdr;
97   int total = *size;
98   void *reply;
99   char *ptr;
100   int i;
101   for (i=0; i<n; ++i) {
102     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes);
103     total += ChMessageInt(hdr->len);
104   }
105   reply = CmiAlloc(total);
106   memcpy(reply, local, *size);
107   ((CcsImplHeader*)(((char*)reply)+CmiMsgHeaderSizeBytes))->len = ChMessageInt_new(total-CmiMsgHeaderSizeBytes-sizeof(CcsImplHeader));
108   CmiFree(local);
109   ptr = ((char*)reply)+*size;
110   for (i=0; i<n; ++i) {
111     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes))->len);
112     memcpy(ptr, ((char*)remote[i])+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader), len);
113     ptr += len;
114   }
115   *size = total;
116   return reply;
117 }
118
119 #define SIMPLE_REDUCTION(name, dataType, loop) \
120 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
121   int i, m; \
122   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiMsgHeaderSizeBytes); \
123   int lenLocal = ChMessageInt(hdrLocal->len); \
124   int nElem = lenLocal / sizeof(dataType); \
125   dataType *ret = (dataType *) (hdrLocal+1); \
126   CcsImplHeader *hdr; \
127   for (m=0; m<n; ++m) { \
128     int len; \
129     dataType *value; \
130     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiMsgHeaderSizeBytes); \
131     len = ChMessageInt(hdr->len); \
132     value = (dataType *)(hdr+1); \
133     CmiAssert(lenLocal == len); \
134     for (i=0; i<nElem; ++i) loop; \
135   } \
136   return local; \
137 }
138
139 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
140 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
141 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
142 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
143
144 /*Use this macro for reductions that have the same type for all inputs */
145 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
146   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
147   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
148   SIMPLE_REDUCTION(nameBase##_double, double, loop)
149
150 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
151 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
152 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
153 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
154
155 #undef SIMPLE_REDUCTION
156 #undef SIMPLE_POLYMORPH_REDUCTION
157
158 int CcsEnabled(void)
159 {
160   return 1;
161 }
162
163 int CcsIsRemoteRequest(void)
164 {
165   return CpvAccess(ccsReq)!=NULL;
166 }
167
168 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
169 {
170   *pip = CpvAccess(ccsReq)->attr.ip;
171   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
172 }
173
174 static int rep_fw_handler_idx;
175
176 /**
177  * Decide if the reply is ready to be forwarded to the waiting client,
178  * or if combination is required (for broadcast/multicast CCS requests.
179  */
180 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
181   int repPE = (int)ChMessageInt(rep->pe);
182   if (repPE <= -1) {
183     /* Reduce the message to get the final reply */
184     CcsHandlerRec *fn;
185     int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+repLen;
186     char *msg=CmiAlloc(len);
187     char *r=msg+CmiMsgHeaderSizeBytes;
188     char *handlerStr;
189     rep->len = ChMessageInt_new(repLen);
190     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
191     memcpy(r,repData,repLen);
192     CmiSetHandler(msg,rep_fw_handler_idx);
193     handlerStr=rep->handler;
194     fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
195     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
196     if (repPE == -1) {
197       /* CCS Broadcast */
198       CmiReduce(msg, len, fn->mergeFn);
199     } else {
200       /* CCS Multicast */
201       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
202     }
203   } else {
204     CcsImpl_reply(rep, repLen, repData);
205   }
206 }
207
208 CcsDelayedReply CcsDelayReply(void)
209 {
210   CcsDelayedReply ret;
211   int len = sizeof(CcsImplHeader);
212   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
213     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
214   ret.hdr = (CcsImplHeader*)malloc(len);
215   memcpy(ret.hdr, CpvAccess(ccsReq), len);
216   CpvAccess(ccsReq)=NULL;
217   return ret;
218 }
219
220 void CcsSendReply(int replyLen, const void *replyData)
221 {
222   if (CpvAccess(ccsReq)==NULL)
223     CmiAbort("CcsSendReply: reply already sent!\n");
224   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
225   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
226   CpvAccess(ccsReq) = NULL;
227 }
228
229 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
230 {
231   CcsImplHeader *h = d.hdr;
232   h->len=ChMessageInt_new(1);
233   CcsReply(h,replyLen,replyData);
234   free(h);
235 }
236
237 void CcsNoReply()
238 {
239   if (CpvAccess(ccsReq)==NULL) return;
240   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
241   CcsReply(CpvAccess(ccsReq),0,NULL);
242   CpvAccess(ccsReq) = NULL;
243 }
244
245 void CcsNoDelayedReply(CcsDelayedReply d)
246 {
247   CcsImplHeader *h = d.hdr;
248   h->len = ChMessageInt_new(0);
249   CcsReply(h,0,NULL);
250   free(h);
251 }
252
253
254 /**********************************
255 _CCS Implementation Routines:
256   These do the request forwarding and
257 delivery.
258 ***********************************/
259
260 /*CCS Bottleneck:
261   Deliver the given message data to the given
262 CCS handler.
263 */
264 static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
265 {
266   char *cmsg;
267   int reqLen=ChMessageInt(hdr->len);
268 /*Look up handler's converse ID*/
269   char *handlerStr=hdr->handler;
270   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
271   if (fn==NULL) {
272     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
273               hdr->handler);
274     CpvAccess(ccsReq)=hdr;
275     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
276     return;
277  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
278   }
279
280 /* Call the handler */
281   CpvAccess(ccsReq)=hdr;
282   callHandlerRec(fn,reqLen,reqData);
283   
284 /*Check if a reply was sent*/
285   if (CpvAccess(ccsReq)!=NULL)
286     CcsSendReply(0,NULL);/*Send an empty reply if not*/
287 }
288
289 /*Unpacks request message to call above routine*/
290 int _ccsHandlerIdx;/*Converse handler index of below routine*/
291 static void req_fw_handler(char *msg)
292 {
293   int offset = CmiMsgHeaderSizeBytes + sizeof(CcsImplHeader);
294   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiMsgHeaderSizeBytes);
295   int destPE = (int)ChMessageInt(hdr->pe);
296   if (CmiMyPe() == 0 && destPE == -1) {
297     /* Broadcast message to all other processors */
298     int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
299     CmiSyncBroadcast(len, msg);
300   }
301   else if (destPE < -1) {
302     /* Multicast the message to your children */
303     int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
304     int index, child, i;
305     int *pes = (int*)(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
306     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
307     offset -= destPE * sizeof(ChMessageInt_t);
308     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
309       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
310     }
311     for (index=0; index<-destPE; ++index) {
312       if (pes[index] == CmiMyPe()) break;
313     }
314     child = (index << 2) + 1;
315     for (i=0; i<4; ++i) {
316       if (child+i < -destPE) {
317         CmiSyncSend(pes[child+i], len, msg);
318       }
319     }
320   }
321   CcsHandleRequest(hdr, msg+offset);
322   CmiFree(msg);
323 }
324
325 /*Convert CCS header & message data into a converse message 
326  addressed to handler*/
327 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
328 {
329   int reqLen=ChMessageInt(hdr->len);
330   int destPE = ChMessageInt(hdr->pe);
331   int len;
332   char *msg;
333   if (destPE < -1) reqLen -= destPE*sizeof(int);
334   len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+reqLen;
335   msg=(char *)CmiAlloc(len);
336   memcpy(msg+CmiMsgHeaderSizeBytes,hdr,sizeof(CcsImplHeader));
337   memcpy(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader),data,reqLen);
338   CmiSetHandler(msg, _ccsHandlerIdx);
339   if (ret_len!=NULL) *ret_len=len;
340   return msg;
341 }
342
343 /*Receives reply messages passed up from
344 converse to node 0.*/
345 static void rep_fw_handler(char *msg)
346 {
347   int len;
348   char *r=msg+CmiMsgHeaderSizeBytes;
349   CcsImplHeader *hdr=(CcsImplHeader *)r; 
350   r+=sizeof(CcsImplHeader);
351   len=ChMessageInt(hdr->len);
352   CcsImpl_reply(hdr,len,r);
353   CmiFree(msg);
354 }
355
356 #if NODE_0_IS_CONVHOST
357 /************** NODE_0_IS_CONVHOST ***********
358 Non net- versions of charm++ are run without a 
359 (real) conv-host program.  This is fine, except 
360 CCS clients connect via conv-host; so for CCS
361 on non-net- versions of charm++, node 0 carries
362 out the CCS forwarding normally done in conv-host.
363
364 CCS works by listening to a TCP connection on a 
365 port-- the Ccs server socket.  A typical communcation
366 pattern is:
367
368 1.) Random program (CCS client) from the net
369 connects to the CCS server socket and sends
370 a CCS request.
371
372 2.) Node 0 forwards the request to the proper
373 PE as a regular converse message (built in CcsImpl_netReq)
374 for CcsHandleRequest.
375
376 3.) CcsHandleRequest looks up the user's pre-registered
377 CCS handler, and passes the user's handler the request data.
378
379 4.) The user's handler calls CcsSendReply with some
380 reply data; OR finishes without calling CcsSendReply,
381 in which case CcsHandleRequest does it.
382
383 5.) CcsSendReply forwards the reply back to node 0,
384 which sends the reply back to the original requestor,
385 on the (still-open) request socket.
386  */
387
388 /**
389 Send a Ccs reply back to the requestor, down the given socket.
390 Since there is no conv-host, node 0 does all the CCS 
391 communication-- this means all requests come to node 0
392 and are forwarded out; all replies are forwarded back to node 0.
393
394 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
395 */
396 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
397 {
398   const int repPE=0;
399   rep->len=ChMessageInt_new(repLen);
400   if (CmiMyPe()==repPE) {
401     /*Actually deliver reply data*/
402     CcsServer_sendReply(rep,repLen,repData);
403   } else {
404     /*Forward data & socket # to the replyPE*/
405     int len=CmiMsgHeaderSizeBytes+
406            sizeof(CcsImplHeader)+repLen;
407     char *msg=CmiAlloc(len);
408     char *r=msg+CmiMsgHeaderSizeBytes;
409     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
410     memcpy(r,repData,repLen);
411     CmiSetHandler(msg,rep_fw_handler_idx);
412     CmiSyncSendAndFree(repPE,len,msg);
413   }
414 }
415
416 /*No request will be sent through this socket.
417 Closes it.
418 */
419 /*void CcsImpl_noReply(CcsImplHeader *hdr)
420 {
421   int fd=ChMessageInt(hdr->replyFd);
422   skt_close(fd);
423 }*/
424
425 /**
426  * This is the entrance point of a CCS request into the server.
427  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
428  */
429 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
430 {
431   char *msg;
432   int len,repPE=ChMessageInt(hdr->pe);
433   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
434     /*Treat out of bound values as errors. Helps detecting bugs*/
435     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
436     else CmiPrintf("Invalid processor index in CCS request.");
437     CpvAccess(ccsReq)=hdr;
438     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
439     return;
440   }
441
442   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
443   if (repPE >= 0) {
444     CmiSyncSendAndFree(repPE,len,msg);
445   } else if (repPE == -1) {
446     /* Broadcast to all processors */
447     CmiPushPE(0, msg);
448   } else {
449     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
450     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
451     CmiSyncSendAndFree(firstPE,len,msg);
452   }
453 }
454
455 /*
456 We have to run a CCS server socket here on
457 node 0.  To keep the speed impact minimal,
458 we only probe for new connections (with CcsServerCheck)
459 occasionally.  
460  */
461 #include <signal.h>
462 #include "ccs-server.c" /*Include implementation here in this case*/
463 #include "ccs-auth.c"
464
465 /*Check for ready Ccs messages:*/
466 void CcsServerCheck(void)
467 {
468   while (1==skt_select1(CcsServer_fd(),0)) {
469     CcsImplHeader hdr;
470     void *data;
471     /* printf("Got CCS connect...\n"); */
472     if (CcsServer_recvRequest(&hdr,&data))
473     {/*We got a network request*/
474       /* printf("Got CCS request...\n"); */
475       if (! check_stdio_header(&hdr)) {
476         CcsImpl_netRequest(&hdr,data);
477       }
478       free(data);
479     }
480   }
481 }
482
483 #endif /*NODE_0_IS_CONVHOST*/
484
485 int _isCcsHandlerIdx(int hIdx) {
486   if (hIdx==_ccsHandlerIdx) return 1;
487   if (hIdx==rep_fw_handler_idx) return 1;
488   return 0;
489 }
490
491 void CcsBuiltinsInit(char **argv);
492
493 CpvDeclare(int, cmiArgDebugFlag);
494 CpvDeclare(char *, displayArgument);
495 CpvDeclare(int, cpdSuspendStartup);
496
497 void CcsInit(char **argv)
498 {
499   CpvInitialize(CkHashtable_c, ccsTab);
500   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
501   CpvInitialize(CcsImplHeader *, ccsReq);
502   CpvAccess(ccsReq) = NULL;
503   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
504   CpvInitialize(int, cmiArgDebugFlag);
505   CpvInitialize(char *, displayArgument);
506   CpvInitialize(int, cpdSuspendStartup);
507   CpvAccess(cmiArgDebugFlag) = 0;
508   CpvAccess(displayArgument) = NULL;
509   CpvAccess(cpdSuspendStartup) = 0;
510   
511   CcsBuiltinsInit(argv);
512
513   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
514 #if NODE_0_IS_CONVHOST
515 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
516   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
517 #endif
518   {
519    int ccs_serverPort=0;
520    char *ccs_serverAuth=NULL;
521    
522    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
523       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
524       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
525     if (CmiMyPe()==0)
526     {/*Create and occasionally poll on a CCS server port*/
527       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
528       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
529     }
530   }
531 #endif
532   /* if in parallel debug mode i.e ++cpd, freeze */
533   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
534   {
535      CpvAccess(cmiArgDebugFlag) = 1;
536      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
537      {
538         if (CpvAccess(displayArgument) == NULL)
539             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
540      }
541      else if (CmiMyPe() == 0)
542      {
543             /* only one processor prints the warning */
544             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
545      }
546
547      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
548        CpvAccess(cpdSuspendStartup) = 1;
549      }
550   }
551 }
552
553 #endif /*CMK_CCS_AVAILABLE*/
554