Preparing to support bigemulator CCS:
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 #include "ckhashtable.h"
28
29 /* Includes all information stored about a single CCS handler. */
30 typedef struct CcsHandlerRec {
31         const char *name; /*Name passed over socket*/
32         CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
33         CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
34         void *userPtr;
35         CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
36         int nCalls; /* Number of times handler has been executed*/
37         CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
38 } CcsHandlerRec;
39
40 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
41   if (strlen(name)>=CCS_MAXHANDLER) 
42         CmiAbort("CCS handler names cannot exceed 32 characters");
43   c->name=strdup(name);
44   c->fn=NULL;
45   c->fnOld=NULL;
46   c->userPtr=NULL;
47   c->mergeFn=NULL;
48   c->nCalls=0;
49 }
50
51 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
52         c->nCalls++;
53         if (c->fnOld) 
54         { /* Backward compatability version:
55             Pack user data into a converse message (cripes! why bother?);
56             user will delete the message. 
57           */
58                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
59                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
60                 (c->fnOld)(cmsg);
61         }
62         else { /* Pass read-only copy of data straight to user */
63                 (c->fn)(c->userPtr, reqLen, reqData);
64         }
65 }
66
67 /*Table maps handler name to CcsHandler object*/
68 typedef CkHashtable_c CcsHandlerTable;
69 CpvStaticDeclare(CcsHandlerTable, ccsTab);
70
71 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
72
73 void CcsRegisterHandler(const char *name, CmiHandler fn) {
74   CcsHandlerRec cp;
75   initHandlerRec(&cp,name);
76   cp.fnOld=fn;
77   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
78 }
79 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
80   CcsHandlerRec cp;
81   initHandlerRec(&cp,name);
82   cp.fn=fn;
83   cp.userPtr=ptr;
84   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
85 }
86 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
87   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
88   if (rec==NULL) {
89     CmiAbort("CCS: Unknown CCS handler name.\n");
90   }
91   rec->mergeFn=newMerge;
92   rec->redID=CmiGetGlobalReduction();
93 }
94
95 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
96   CcsImplHeader *hdr;
97   int total = *size;
98   void *reply;
99   char *ptr;
100   int i;
101   for (i=0; i<n; ++i) {
102     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
103     total += ChMessageInt(hdr->len);
104   }
105   reply = CmiAlloc(total);
106   memcpy(reply, local, *size);
107   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
108   CmiFree(local);
109   ptr = ((char*)reply)+*size;
110   for (i=0; i<n; ++i) {
111     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
112     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
113     ptr += len;
114   }
115   *size = total;
116   return reply;
117 }
118
119 #define SIMPLE_REDUCTION(name, dataType, loop) \
120 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
121   int i, m; \
122   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
123   int lenLocal = ChMessageInt(hdrLocal->len); \
124   int nElem = lenLocal / sizeof(dataType); \
125   dataType *ret = (dataType *) (hdrLocal+1); \
126   CcsImplHeader *hdr; \
127   for (m=0; m<n; ++m) { \
128     int len; \
129     dataType *value; \
130     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
131     len = ChMessageInt(hdr->len); \
132     value = (dataType *)(hdr+1); \
133     CmiAssert(lenLocal == len); \
134     for (i=0; i<nElem; ++i) loop; \
135   } \
136   return local; \
137 }
138
139 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
140 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
141 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
142 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
143
144 /*Use this macro for reductions that have the same type for all inputs */
145 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
146   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
147   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
148   SIMPLE_REDUCTION(nameBase##_double, double, loop)
149
150 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
151 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
152 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
153 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
154
155 #undef SIMPLE_REDUCTION
156 #undef SIMPLE_POLYMORPH_REDUCTION
157
158 int CcsEnabled(void)
159 {
160   return 1;
161 }
162
163 int CcsIsRemoteRequest(void)
164 {
165   return CpvAccess(ccsReq)!=NULL;
166 }
167
168 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
169 {
170   *pip = CpvAccess(ccsReq)->attr.ip;
171   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
172 }
173
174 static int rep_fw_handler_idx;
175
176 /**
177  * Decide if the reply is ready to be forwarded to the waiting client,
178  * or if combination is required (for broadcast/multicast CCS requests.
179  */
180 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
181   int repPE = (int)ChMessageInt(rep->pe);
182   if (repPE <= -1) {
183     /* Reduce the message to get the final reply */
184     CcsHandlerRec *fn;
185     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
186     char *msg=CmiAlloc(len);
187     char *r=msg+CmiReservedHeaderSize;
188     char *handlerStr;
189     rep->len = ChMessageInt_new(repLen);
190     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
191     memcpy(r,repData,repLen);
192     CmiSetHandler(msg,rep_fw_handler_idx);
193     handlerStr=rep->handler;
194     fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
195     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
196     if (repPE == -1) {
197       /* CCS Broadcast */
198       CmiReduce(msg, len, fn->mergeFn);
199     } else {
200       /* CCS Multicast */
201       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
202     }
203   } else {
204     CcsImpl_reply(rep, repLen, repData);
205   }
206 }
207
208 CcsDelayedReply CcsDelayReply(void)
209 {
210   CcsDelayedReply ret;
211   int len = sizeof(CcsImplHeader);
212   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
213     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
214   ret.hdr = (CcsImplHeader*)malloc(len);
215   memcpy(ret.hdr, CpvAccess(ccsReq), len);
216   CpvAccess(ccsReq)=NULL;
217   return ret;
218 }
219
220 void CcsSendReply(int replyLen, const void *replyData)
221 {
222   if (CpvAccess(ccsReq)==NULL)
223     CmiAbort("CcsSendReply: reply already sent!\n");
224   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
225   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
226   CpvAccess(ccsReq) = NULL;
227 }
228
229 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
230 {
231   CcsImplHeader *h = d.hdr;
232   h->len=ChMessageInt_new(1);
233   CcsReply(h,replyLen,replyData);
234   free(h);
235 }
236
237 void CcsNoReply()
238 {
239   if (CpvAccess(ccsReq)==NULL) return;
240   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
241   CcsReply(CpvAccess(ccsReq),0,NULL);
242   CpvAccess(ccsReq) = NULL;
243 }
244
245 void CcsNoDelayedReply(CcsDelayedReply d)
246 {
247   CcsImplHeader *h = d.hdr;
248   h->len = ChMessageInt_new(0);
249   CcsReply(h,0,NULL);
250   free(h);
251 }
252
253
254 /**********************************
255 _CCS Implementation Routines:
256   These do the request forwarding and
257 delivery.
258 ***********************************/
259
260 /*CCS Bottleneck:
261   Deliver the given message data to the given
262 CCS handler.
263 */
264 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
265 {
266   char *cmsg;
267   int reqLen=ChMessageInt(hdr->len);
268 /*Look up handler's converse ID*/
269   char *handlerStr=hdr->handler;
270   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
271   if (fn==NULL) {
272     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
273               hdr->handler);
274     CpvAccess(ccsReq)=hdr;
275     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
276     return;
277  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
278   }
279
280 /* Call the handler */
281   CpvAccess(ccsReq)=hdr;
282   callHandlerRec(fn,reqLen,reqData);
283   
284 /*Check if a reply was sent*/
285   if (CpvAccess(ccsReq)!=NULL)
286     CcsSendReply(0,NULL);/*Send an empty reply if not*/
287 }
288
289 /*Unpacks request message to call above routine*/
290 int _ccsHandlerIdx = 0;/*Converse handler index of below routine*/
291 #if ! NODE_0_IS_CONVHOST
292 /* The followings are necessary to prevent CCS requests to be processed before
293  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
294  * it doesn't hurt having it in the other case as well */
295 static char **bufferedMessages = NULL;
296 static int CcsNumBufferedMsgs = 0;
297 #define CCS_MAX_NUM_BUFFERED_MSGS  100
298 #endif
299
300 void CcsBufferMessage(char *msg) {
301   //CmiPrintf("Buffering CCS message\n");
302   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
303   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
304   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
305   bufferedMessages[CcsNumBufferedMsgs] = msg;
306   CcsNumBufferedMsgs ++;
307 }
308   
309 extern void req_fw_handler(char *msg);
310
311 void CcsReleaseMessages() {
312 #if ! NODE_0_IS_CONVHOST
313   if (CcsNumBufferedMsgs > 0) {
314     int i;
315     for (i=0; i<CcsNumBufferedMsgs; ++i) {
316       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
317       CmiPushPE(0, bufferedMessages[i]);
318     }
319     free(bufferedMessages);
320     bufferedMessages = NULL;
321     CcsNumBufferedMsgs = -1;
322   }
323 #endif
324 }
325
326 /*Convert CCS header & message data into a converse message 
327  addressed to handler*/
328 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
329 {
330   int reqLen=ChMessageInt(hdr->len);
331   int destPE = ChMessageInt(hdr->pe);
332   int len;
333   char *msg;
334   if (destPE < -1) reqLen -= destPE*sizeof(int);
335   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
336   msg=(char *)CmiAlloc(len);
337   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
338   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
339   if (ret_len!=NULL) *ret_len=len;
340   if (_ccsHandlerIdx != 0) {
341     CmiSetHandler(msg, _ccsHandlerIdx);
342     return msg;
343   } else {
344 #if NODE_0_IS_CONVHOST
345     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
346 #else
347     CcsBufferMessage(msg);
348     return NULL;
349 #endif
350   }
351 }
352
353 /*Receives reply messages passed up from
354 converse to node 0.*/
355 static void rep_fw_handler(char *msg)
356 {
357   int len;
358   char *r=msg+CmiReservedHeaderSize;
359   CcsImplHeader *hdr=(CcsImplHeader *)r; 
360   r+=sizeof(CcsImplHeader);
361   len=ChMessageInt(hdr->len);
362   CcsImpl_reply(hdr,len,r);
363   CmiFree(msg);
364 }
365
366 #if NODE_0_IS_CONVHOST
367 /************** NODE_0_IS_CONVHOST ***********
368 Non net- versions of charm++ are run without a 
369 (real) conv-host program.  This is fine, except 
370 CCS clients connect via conv-host; so for CCS
371 on non-net- versions of charm++, node 0 carries
372 out the CCS forwarding normally done in conv-host.
373
374 CCS works by listening to a TCP connection on a 
375 port-- the Ccs server socket.  A typical communcation
376 pattern is:
377
378 1.) Random program (CCS client) from the net
379 connects to the CCS server socket and sends
380 a CCS request.
381
382 2.) Node 0 forwards the request to the proper
383 PE as a regular converse message (built in CcsImpl_netReq)
384 for CcsHandleRequest.
385
386 3.) CcsHandleRequest looks up the user's pre-registered
387 CCS handler, and passes the user's handler the request data.
388
389 4.) The user's handler calls CcsSendReply with some
390 reply data; OR finishes without calling CcsSendReply,
391 in which case CcsHandleRequest does it.
392
393 5.) CcsSendReply forwards the reply back to node 0,
394 which sends the reply back to the original requestor,
395 on the (still-open) request socket.
396  */
397
398 /**
399 Send a Ccs reply back to the requestor, down the given socket.
400 Since there is no conv-host, node 0 does all the CCS 
401 communication-- this means all requests come to node 0
402 and are forwarded out; all replies are forwarded back to node 0.
403
404 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
405 */
406 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
407 {
408   const int repPE=0;
409   rep->len=ChMessageInt_new(repLen);
410   if (CmiMyPe()==repPE) {
411     /*Actually deliver reply data*/
412     CcsServer_sendReply(rep,repLen,repData);
413   } else {
414     /*Forward data & socket # to the replyPE*/
415     int len=CmiReservedHeaderSize+
416            sizeof(CcsImplHeader)+repLen;
417     char *msg=CmiAlloc(len);
418     char *r=msg+CmiReservedHeaderSize;
419     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
420     memcpy(r,repData,repLen);
421     CmiSetHandler(msg,rep_fw_handler_idx);
422     CmiSyncSendAndFree(repPE,len,msg);
423   }
424 }
425
426 /*No request will be sent through this socket.
427 Closes it.
428 */
429 /*void CcsImpl_noReply(CcsImplHeader *hdr)
430 {
431   int fd=ChMessageInt(hdr->replyFd);
432   skt_close(fd);
433 }*/
434
435 /**
436  * This is the entrance point of a CCS request into the server.
437  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
438  */
439 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
440 {
441   char *msg;
442   int len,repPE=ChMessageInt(hdr->pe);
443   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
444     /*Treat out of bound values as errors. Helps detecting bugs*/
445     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
446     else CmiPrintf("Invalid processor index in CCS request.");
447     CpvAccess(ccsReq)=hdr;
448     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
449     return;
450   }
451
452   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
453   if (repPE >= 0) {
454     CmiSyncSendAndFree(repPE,len,msg);
455   } else if (repPE == -1) {
456     /* Broadcast to all processors */
457     CmiPushPE(0, msg);
458   } else {
459     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
460     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
461     CmiSyncSendAndFree(firstPE,len,msg);
462   }
463 }
464
465 /*
466 We have to run a CCS server socket here on
467 node 0.  To keep the speed impact minimal,
468 we only probe for new connections (with CcsServerCheck)
469 occasionally.  
470  */
471 #include <signal.h>
472 #include "ccs-server.c" /*Include implementation here in this case*/
473 #include "ccs-auth.c"
474
475 /*Check for ready Ccs messages:*/
476 void CcsServerCheck(void)
477 {
478   while (1==skt_select1(CcsServer_fd(),0)) {
479     CcsImplHeader hdr;
480     void *data;
481     /* printf("Got CCS connect...\n"); */
482     if (CcsServer_recvRequest(&hdr,&data))
483     {/*We got a network request*/
484       /* printf("Got CCS request...\n"); */
485       if (! check_stdio_header(&hdr)) {
486         CcsImpl_netRequest(&hdr,data);
487       }
488       free(data);
489     }
490   }
491 }
492
493 #endif /*NODE_0_IS_CONVHOST*/
494
495 int _isCcsHandlerIdx(int hIdx) {
496   if (hIdx==_ccsHandlerIdx) return 1;
497   if (hIdx==rep_fw_handler_idx) return 1;
498   return 0;
499 }
500
501 void CcsBuiltinsInit(char **argv);
502
503 CpvDeclare(int, cmiArgDebugFlag);
504 CpvDeclare(char *, displayArgument);
505 CpvDeclare(int, cpdSuspendStartup);
506
507 void CcsInit(char **argv)
508 {
509   CpvInitialize(CkHashtable_c, ccsTab);
510   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
511   CpvInitialize(CcsImplHeader *, ccsReq);
512   CpvAccess(ccsReq) = NULL;
513   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
514   CpvInitialize(int, cmiArgDebugFlag);
515   CpvInitialize(char *, displayArgument);
516   CpvInitialize(int, cpdSuspendStartup);
517   CpvAccess(cmiArgDebugFlag) = 0;
518   CpvAccess(displayArgument) = NULL;
519   CpvAccess(cpdSuspendStartup) = 0;
520   
521   CcsBuiltinsInit(argv);
522
523   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
524 #if NODE_0_IS_CONVHOST
525 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
526   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
527 #endif
528   {
529    int ccs_serverPort=0;
530    char *ccs_serverAuth=NULL;
531    
532    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
533       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
534       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
535     if (CmiMyPe()==0)
536     {/*Create and occasionally poll on a CCS server port*/
537       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
538       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
539     }
540   }
541 #endif
542   /* if in parallel debug mode i.e ++cpd, freeze */
543   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
544   {
545      CpvAccess(cmiArgDebugFlag) = 1;
546      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
547      {
548         if (CpvAccess(displayArgument) == NULL)
549             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
550      }
551      else if (CmiMyPe() == 0)
552      {
553             /* only one processor prints the warning */
554             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
555      }
556
557      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
558        CpvAccess(cpdSuspendStartup) = 1;
559      }
560   }
561
562   CcsReleaseMessages();
563 }
564
565 #endif /*CMK_CCS_AVAILABLE*/
566