Implemented CkReduce to mimic CmiReduce but within the virtualized BigSim.
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 #include "ckhashtable.h"
28
29 /* Includes all information stored about a single CCS handler. */
30 typedef struct CcsHandlerRec {
31         const char *name; /*Name passed over socket*/
32         CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
33         CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
34         void *userPtr;
35         CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
36         int nCalls; /* Number of times handler has been executed*/
37         CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
38 } CcsHandlerRec;
39
40 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
41   if (strlen(name)>=CCS_MAXHANDLER) 
42         CmiAbort("CCS handler names cannot exceed 32 characters");
43   c->name=strdup(name);
44   c->fn=NULL;
45   c->fnOld=NULL;
46   c->userPtr=NULL;
47   c->mergeFn=NULL;
48   c->nCalls=0;
49 }
50
51 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
52         c->nCalls++;
53         if (c->fnOld) 
54         { /* Backward compatability version:
55             Pack user data into a converse message (cripes! why bother?);
56             user will delete the message. 
57           */
58                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
59                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
60                 (c->fnOld)(cmsg);
61         }
62         else { /* Pass read-only copy of data straight to user */
63                 (c->fn)(c->userPtr, reqLen, reqData);
64         }
65 }
66
67 /*Table maps handler name to CcsHandler object*/
68 typedef CkHashtable_c CcsHandlerTable;
69 CpvStaticDeclare(CcsHandlerTable, ccsTab);
70
71 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
72
73 void CcsRegisterHandler(const char *name, CmiHandler fn) {
74   CcsHandlerRec cp;
75   initHandlerRec(&cp,name);
76   cp.fnOld=fn;
77   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
78 }
79 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
80   CcsHandlerRec cp;
81   initHandlerRec(&cp,name);
82   cp.fn=fn;
83   cp.userPtr=ptr;
84   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
85 }
86 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
87   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
88   if (rec==NULL) {
89     CmiAbort("CCS: Unknown CCS handler name.\n");
90   }
91   rec->mergeFn=newMerge;
92   rec->redID=CmiGetGlobalReduction();
93 }
94
95 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
96   CcsImplHeader *hdr;
97   int total = *size;
98   void *reply;
99   char *ptr;
100   int i;
101   for (i=0; i<n; ++i) {
102     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
103     total += ChMessageInt(hdr->len);
104   }
105   reply = CmiAlloc(total);
106   memcpy(reply, local, *size);
107   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
108   CmiFree(local);
109   ptr = ((char*)reply)+*size;
110   for (i=0; i<n; ++i) {
111     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
112     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
113     ptr += len;
114   }
115   *size = total;
116   return reply;
117 }
118
119 #define SIMPLE_REDUCTION(name, dataType, loop) \
120 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
121   int i, m; \
122   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
123   int lenLocal = ChMessageInt(hdrLocal->len); \
124   int nElem = lenLocal / sizeof(dataType); \
125   dataType *ret = (dataType *) (hdrLocal+1); \
126   CcsImplHeader *hdr; \
127   for (m=0; m<n; ++m) { \
128     int len; \
129     dataType *value; \
130     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
131     len = ChMessageInt(hdr->len); \
132     value = (dataType *)(hdr+1); \
133     CmiAssert(lenLocal == len); \
134     for (i=0; i<nElem; ++i) loop; \
135   } \
136   return local; \
137 }
138
139 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
140 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
141 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
142 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
143
144 /*Use this macro for reductions that have the same type for all inputs */
145 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
146   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
147   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
148   SIMPLE_REDUCTION(nameBase##_double, double, loop)
149
150 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
151 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
152 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
153 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
154
155 #undef SIMPLE_REDUCTION
156 #undef SIMPLE_POLYMORPH_REDUCTION
157
158 int CcsEnabled(void)
159 {
160   return 1;
161 }
162
163 int CcsIsRemoteRequest(void)
164 {
165   return CpvAccess(ccsReq)!=NULL;
166 }
167
168 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
169 {
170   *pip = CpvAccess(ccsReq)->attr.ip;
171   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
172 }
173
174 static int rep_fw_handler_idx;
175
176 /**
177  * Decide if the reply is ready to be forwarded to the waiting client,
178  * or if combination is required (for broadcast/multicast CCS requests.
179  */
180 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
181   int repPE = (int)ChMessageInt(rep->pe);
182   if (repPE <= -1) {
183     /* Reduce the message to get the final reply */
184     CcsHandlerRec *fn;
185     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
186     char *msg=CmiAlloc(len);
187     char *r=msg+CmiReservedHeaderSize;
188     char *handlerStr;
189     rep->len = ChMessageInt_new(repLen);
190     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
191     memcpy(r,repData,repLen);
192     CmiSetHandler(msg,rep_fw_handler_idx);
193     handlerStr=rep->handler;
194     fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
195     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
196     if (repPE == -1) {
197       /* CCS Broadcast */
198       CkReduce(msg, len, fn->mergeFn);
199     } else {
200       /* CCS Multicast */
201       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
202     }
203   } else {
204     CcsImpl_reply(rep, repLen, repData);
205   }
206 }
207
208 CcsDelayedReply CcsDelayReply(void)
209 {
210   CcsDelayedReply ret;
211   int len = sizeof(CcsImplHeader);
212   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
213     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
214   ret.hdr = (CcsImplHeader*)malloc(len);
215   memcpy(ret.hdr, CpvAccess(ccsReq), len);
216   CpvAccess(ccsReq)=NULL;
217   return ret;
218 }
219
220 void CcsSendReply(int replyLen, const void *replyData)
221 {
222   if (CpvAccess(ccsReq)==NULL)
223     CmiAbort("CcsSendReply: reply already sent!\n");
224   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
225   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
226   CpvAccess(ccsReq) = NULL;
227 }
228
229 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
230 {
231   CcsImplHeader *h = d.hdr;
232   h->len=ChMessageInt_new(1);
233   CcsReply(h,replyLen,replyData);
234   free(h);
235 }
236
237 void CcsNoReply()
238 {
239   if (CpvAccess(ccsReq)==NULL) return;
240   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
241   CcsReply(CpvAccess(ccsReq),0,NULL);
242   CpvAccess(ccsReq) = NULL;
243 }
244
245 void CcsNoDelayedReply(CcsDelayedReply d)
246 {
247   CcsImplHeader *h = d.hdr;
248   h->len = ChMessageInt_new(0);
249   CcsReply(h,0,NULL);
250   free(h);
251 }
252
253
254 /**********************************
255 _CCS Implementation Routines:
256   These do the request forwarding and
257 delivery.
258 ***********************************/
259
260 /*CCS Bottleneck:
261   Deliver the given message data to the given
262 CCS handler.
263 */
264 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
265 {
266   char *cmsg;
267   int reqLen=ChMessageInt(hdr->len);
268 /*Look up handler's converse ID*/
269   char *handlerStr=hdr->handler;
270   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
271   if (fn==NULL) {
272     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
273               hdr->handler);
274     CpvAccess(ccsReq)=hdr;
275     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
276     return;
277  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
278   }
279
280 /* Call the handler */
281   CpvAccess(ccsReq)=hdr;
282   callHandlerRec(fn,reqLen,reqData);
283   
284 /*Check if a reply was sent*/
285   if (CpvAccess(ccsReq)!=NULL)
286     CcsSendReply(0,NULL);/*Send an empty reply if not*/
287 }
288
289 #if ! NODE_0_IS_CONVHOST
290 /* The followings are necessary to prevent CCS requests to be processed before
291  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
292  * it doesn't hurt having it in the other case as well */
293 static char **bufferedMessages = NULL;
294 static int CcsNumBufferedMsgs = 0;
295 #define CCS_MAX_NUM_BUFFERED_MSGS  100
296 #endif
297
298 void CcsBufferMessage(char *msg) {
299   //CmiPrintf("Buffering CCS message\n");
300   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
301   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
302   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
303   bufferedMessages[CcsNumBufferedMsgs] = msg;
304   CcsNumBufferedMsgs ++;
305 }
306   
307 /*Unpacks request message to call above routine*/
308 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
309
310 #if CMK_BLUEGENE_CHARM
311 CpvDeclare(int, _bgCcsHandlerIdx);
312 CpvDeclare(int, _bgCcsAck);
313 /* This routine is needed when the application is built on top of the bigemulator
314  * layer of Charm. In this case, the real CCS handler must be called within a
315  * worker thread. The function of this function is to receive the CCS message in
316  * the bottom converse layer and forward it to the emulated layer. */
317 static void bg_req_fw_handler(char *msg) {
318   if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
319     CcsBufferMessage(msg);
320     return;
321   }
322   /* Get out of the message who is the destination pe */
323   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
324   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
325   int destPE = (int)ChMessageInt(hdr->pe);
326   if (destPE == -1) destPE = 0;
327   if (destPE < -1) {
328     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
329     destPE = ChMessageInt(pes_nbo[0]);
330   }
331   //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
332   (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
333   (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
334   (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
335   (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
336   (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
337   /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
338   addBgNodeInbuffer_c(msg, destPE/CmiNumPes());
339   //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
340 }
341 #define req_fw_handler bg_req_fw_handler
342 #endif
343 extern void req_fw_handler(char *msg);
344
345 void CcsReleaseMessages() {
346 #if ! NODE_0_IS_CONVHOST
347 #if CMK_BLUEGENE_CHARM
348   if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
349 #endif
350   if (CcsNumBufferedMsgs > 0) {
351     int i;
352     for (i=0; i<CcsNumBufferedMsgs; ++i) {
353       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
354       CmiPushPE(0, bufferedMessages[i]);
355     }
356     free(bufferedMessages);
357     bufferedMessages = NULL;
358     CcsNumBufferedMsgs = -1;
359   }
360 #endif
361 }
362
363 /*Convert CCS header & message data into a converse message 
364  addressed to handler*/
365 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
366 {
367   int reqLen=ChMessageInt(hdr->len);
368   int destPE = ChMessageInt(hdr->pe);
369   int len;
370   char *msg;
371   if (destPE < -1) reqLen -= destPE*sizeof(int);
372   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
373   msg=(char *)CmiAlloc(len);
374   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
375   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
376   if (ret_len!=NULL) *ret_len=len;
377   if (_ccsHandlerIdx != 0) {
378     CmiSetHandler(msg, _ccsHandlerIdx);
379     return msg;
380   } else {
381 #if NODE_0_IS_CONVHOST
382     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
383 #else
384     CcsBufferMessage(msg);
385     return NULL;
386 #endif
387   }
388 }
389
390 /*Receives reply messages passed up from
391 converse to node 0.*/
392 static void rep_fw_handler(char *msg)
393 {
394   int len;
395   char *r=msg+CmiReservedHeaderSize;
396   CcsImplHeader *hdr=(CcsImplHeader *)r; 
397   r+=sizeof(CcsImplHeader);
398   len=ChMessageInt(hdr->len);
399   CcsImpl_reply(hdr,len,r);
400   CmiFree(msg);
401 }
402
403 #if NODE_0_IS_CONVHOST
404 /************** NODE_0_IS_CONVHOST ***********
405 Non net- versions of charm++ are run without a 
406 (real) conv-host program.  This is fine, except 
407 CCS clients connect via conv-host; so for CCS
408 on non-net- versions of charm++, node 0 carries
409 out the CCS forwarding normally done in conv-host.
410
411 CCS works by listening to a TCP connection on a 
412 port-- the Ccs server socket.  A typical communcation
413 pattern is:
414
415 1.) Random program (CCS client) from the net
416 connects to the CCS server socket and sends
417 a CCS request.
418
419 2.) Node 0 forwards the request to the proper
420 PE as a regular converse message (built in CcsImpl_netReq)
421 for CcsHandleRequest.
422
423 3.) CcsHandleRequest looks up the user's pre-registered
424 CCS handler, and passes the user's handler the request data.
425
426 4.) The user's handler calls CcsSendReply with some
427 reply data; OR finishes without calling CcsSendReply,
428 in which case CcsHandleRequest does it.
429
430 5.) CcsSendReply forwards the reply back to node 0,
431 which sends the reply back to the original requestor,
432 on the (still-open) request socket.
433  */
434
435 /**
436 Send a Ccs reply back to the requestor, down the given socket.
437 Since there is no conv-host, node 0 does all the CCS 
438 communication-- this means all requests come to node 0
439 and are forwarded out; all replies are forwarded back to node 0.
440
441 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
442 */
443 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
444 {
445   const int repPE=0;
446   rep->len=ChMessageInt_new(repLen);
447   if (CmiMyPe()==repPE) {
448     /*Actually deliver reply data*/
449     CcsServer_sendReply(rep,repLen,repData);
450   } else {
451     /*Forward data & socket # to the replyPE*/
452     int len=CmiReservedHeaderSize+
453            sizeof(CcsImplHeader)+repLen;
454     char *msg=CmiAlloc(len);
455     char *r=msg+CmiReservedHeaderSize;
456     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
457     memcpy(r,repData,repLen);
458     CmiSetHandler(msg,rep_fw_handler_idx);
459     CmiSyncSendAndFree(repPE,len,msg);
460   }
461 }
462
463 /*No request will be sent through this socket.
464 Closes it.
465 */
466 /*void CcsImpl_noReply(CcsImplHeader *hdr)
467 {
468   int fd=ChMessageInt(hdr->replyFd);
469   skt_close(fd);
470 }*/
471
472 /**
473  * This is the entrance point of a CCS request into the server.
474  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
475  */
476 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
477 {
478   char *msg;
479   int len,repPE=ChMessageInt(hdr->pe);
480   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
481     /*Treat out of bound values as errors. Helps detecting bugs*/
482     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
483     else CmiPrintf("Invalid processor index in CCS request.");
484     CpvAccess(ccsReq)=hdr;
485     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
486     return;
487   }
488
489   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
490   if (repPE >= 0) {
491     CmiSyncSendAndFree(repPE,len,msg);
492   } else if (repPE == -1) {
493     /* Broadcast to all processors */
494     CmiPushPE(0, msg);
495   } else {
496     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
497     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
498     CmiSyncSendAndFree(firstPE,len,msg);
499   }
500 }
501
502 /*
503 We have to run a CCS server socket here on
504 node 0.  To keep the speed impact minimal,
505 we only probe for new connections (with CcsServerCheck)
506 occasionally.  
507  */
508 #include <signal.h>
509 #include "ccs-server.c" /*Include implementation here in this case*/
510 #include "ccs-auth.c"
511
512 /*Check for ready Ccs messages:*/
513 void CcsServerCheck(void)
514 {
515   while (1==skt_select1(CcsServer_fd(),0)) {
516     CcsImplHeader hdr;
517     void *data;
518     /* printf("Got CCS connect...\n"); */
519     if (CcsServer_recvRequest(&hdr,&data))
520     {/*We got a network request*/
521       /* printf("Got CCS request...\n"); */
522       if (! check_stdio_header(&hdr)) {
523         CcsImpl_netRequest(&hdr,data);
524       }
525       free(data);
526     }
527   }
528 }
529
530 #endif /*NODE_0_IS_CONVHOST*/
531
532 int _isCcsHandlerIdx(int hIdx) {
533   if (hIdx==_ccsHandlerIdx) return 1;
534   if (hIdx==rep_fw_handler_idx) return 1;
535   return 0;
536 }
537
538 void CcsBuiltinsInit(char **argv);
539
540 CpvDeclare(int, cmiArgDebugFlag);
541 CpvDeclare(char *, displayArgument);
542 CpvDeclare(int, cpdSuspendStartup);
543
544 void CcsInit(char **argv)
545 {
546   CpvInitialize(CkHashtable_c, ccsTab);
547   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
548   CpvInitialize(CcsImplHeader *, ccsReq);
549   CpvAccess(ccsReq) = NULL;
550   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
551 #if CMK_BLUEGENE_CHARM
552   CpvInitialize(int, _bgCcsHandlerIdx);
553   CpvAccess(_bgCcsHandlerIdx) = 0;
554   CpvInitialize(int, _bgCcsAck);
555   CpvAccess(_bgCcsAck) = 0;
556 #endif
557   CpvInitialize(int, cmiArgDebugFlag);
558   CpvInitialize(char *, displayArgument);
559   CpvInitialize(int, cpdSuspendStartup);
560   CpvAccess(cmiArgDebugFlag) = 0;
561   CpvAccess(displayArgument) = NULL;
562   CpvAccess(cpdSuspendStartup) = 0;
563   
564   CcsBuiltinsInit(argv);
565
566   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
567 #if NODE_0_IS_CONVHOST
568 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
569   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
570 #endif
571   {
572    int ccs_serverPort=0;
573    char *ccs_serverAuth=NULL;
574    
575    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
576       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
577       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
578     if (CmiMyPe()==0)
579     {/*Create and occasionally poll on a CCS server port*/
580       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
581       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
582     }
583   }
584 #endif
585   /* if in parallel debug mode i.e ++cpd, freeze */
586   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
587   {
588      CpvAccess(cmiArgDebugFlag) = 1;
589      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
590      {
591         if (CpvAccess(displayArgument) == NULL)
592             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
593      }
594      else if (CmiMyPe() == 0)
595      {
596             /* only one processor prints the warning */
597             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
598      }
599
600      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
601        CpvAccess(cpdSuspendStartup) = 1;
602      }
603   }
604
605   CcsReleaseMessages();
606 }
607
608 #endif /*CMK_CCS_AVAILABLE*/
609