03e6c6d76877352a7f64a33168383e68de5b93c7
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
28   if (strlen(name)>=CCS_MAXHANDLER) 
29         CmiAbort("CCS handler names cannot exceed 32 characters");
30   c->name=strdup(name);
31   c->fn=NULL;
32   c->fnOld=NULL;
33   c->userPtr=NULL;
34   c->mergeFn=NULL;
35   c->nCalls=0;
36 }
37
38 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
39         c->nCalls++;
40         if (c->fnOld) 
41         { /* Backward compatability version:
42             Pack user data into a converse message (cripes! why bother?);
43             user will delete the message. 
44           */
45                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
46                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
47                 (c->fnOld)(cmsg);
48         }
49         else { /* Pass read-only copy of data straight to user */
50                 (c->fn)(c->userPtr, reqLen, reqData);
51         }
52 }
53
54 /*Table maps handler name to CcsHandler object*/
55 CpvDeclare(CcsHandlerTable, ccsTab);
56
57 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
58
59 void CcsRegisterHandler(const char *name, CmiHandler fn) {
60   CcsHandlerRec cp;
61   initHandlerRec(&cp,name);
62   cp.fnOld=fn;
63   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
64 }
65 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
66   CcsHandlerRec cp;
67   initHandlerRec(&cp,name);
68   cp.fn=fn;
69   cp.userPtr=ptr;
70   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
71 }
72 CcsHandlerRec *CcsGetHandler(const char *name) {
73   return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
74 }
75 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
76   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
77   if (rec==NULL) {
78     CmiAbort("CCS: Unknown CCS handler name.\n");
79   }
80   rec->mergeFn=newMerge;
81   rec->redID=CmiGetGlobalReduction();
82 }
83
84 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
85   CcsImplHeader *hdr;
86   int total = *size;
87   void *reply;
88   char *ptr;
89   int i;
90   for (i=0; i<n; ++i) {
91     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
92     total += ChMessageInt(hdr->len);
93   }
94   reply = CmiAlloc(total);
95   memcpy(reply, local, *size);
96   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
97   CmiFree(local);
98   ptr = ((char*)reply)+*size;
99   for (i=0; i<n; ++i) {
100     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
101     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
102     ptr += len;
103   }
104   *size = total;
105   return reply;
106 }
107
108 #define SIMPLE_REDUCTION(name, dataType, loop) \
109 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
110   int i, m; \
111   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
112   int lenLocal = ChMessageInt(hdrLocal->len); \
113   int nElem = lenLocal / sizeof(dataType); \
114   dataType *ret = (dataType *) (hdrLocal+1); \
115   CcsImplHeader *hdr; \
116   for (m=0; m<n; ++m) { \
117     int len; \
118     dataType *value; \
119     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
120     len = ChMessageInt(hdr->len); \
121     value = (dataType *)(hdr+1); \
122     CmiAssert(lenLocal == len); \
123     for (i=0; i<nElem; ++i) loop; \
124   } \
125   return local; \
126 }
127
128 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
129 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
130 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
131 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
132
133 /*Use this macro for reductions that have the same type for all inputs */
134 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
135   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
136   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
137   SIMPLE_REDUCTION(nameBase##_double, double, loop)
138
139 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
140 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
141 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
142 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
143
144 #undef SIMPLE_REDUCTION
145 #undef SIMPLE_POLYMORPH_REDUCTION
146
147 int CcsEnabled(void)
148 {
149   return 1;
150 }
151
152 int CcsIsRemoteRequest(void)
153 {
154   return CpvAccess(ccsReq)!=NULL;
155 }
156
157 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
158 {
159   *pip = CpvAccess(ccsReq)->attr.ip;
160   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
161 }
162
163 int rep_fw_handler_idx;
164
165 CcsDelayedReply CcsDelayReply(void)
166 {
167   CcsDelayedReply ret;
168   int len = sizeof(CcsImplHeader);
169   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
170     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
171   ret.hdr = (CcsImplHeader*)malloc(len);
172   memcpy(ret.hdr, CpvAccess(ccsReq), len);
173   CpvAccess(ccsReq)=NULL;
174   return ret;
175 }
176
177 void CcsSendReply(int replyLen, const void *replyData)
178 {
179   if (CpvAccess(ccsReq)==NULL)
180     CmiAbort("CcsSendReply: reply already sent!\n");
181   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
182   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
183   CpvAccess(ccsReq) = NULL;
184 }
185
186 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
187 {
188   CcsImplHeader *h = d.hdr;
189   h->len=ChMessageInt_new(1);
190   CcsReply(h,replyLen,replyData);
191   free(h);
192 }
193
194 void CcsNoReply()
195 {
196   if (CpvAccess(ccsReq)==NULL) return;
197   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
198   CcsReply(CpvAccess(ccsReq),0,NULL);
199   CpvAccess(ccsReq) = NULL;
200 }
201
202 void CcsNoDelayedReply(CcsDelayedReply d)
203 {
204   CcsImplHeader *h = d.hdr;
205   h->len = ChMessageInt_new(0);
206   CcsReply(h,0,NULL);
207   free(h);
208 }
209
210
211 /**********************************
212 _CCS Implementation Routines:
213   These do the request forwarding and
214 delivery.
215 ***********************************/
216
217 /*CCS Bottleneck:
218   Deliver the given message data to the given
219 CCS handler.
220 */
221 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
222 {
223   char *cmsg;
224   int reqLen=ChMessageInt(hdr->len);
225 /*Look up handler's converse ID*/
226   char *handlerStr=hdr->handler;
227   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
228   if (fn==NULL) {
229     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
230               hdr->handler);
231     CpvAccess(ccsReq)=hdr;
232     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
233     return;
234  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
235   }
236
237 /* Call the handler */
238   CpvAccess(ccsReq)=hdr;
239 #if CMK_CHARMDEBUG
240   if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
241     /* We are conditionally delivering, send the message to the child and wait for its response */
242     int bytes = reqLen+((int)(reqData-((char*)hdr)))+CmiReservedHeaderSize;
243     write(conditionalPipe[1], &bytes, 4);
244     write(conditionalPipe[1], ((char*)hdr)-CmiReservedHeaderSize, bytes);
245     if (4==read(conditionalPipe[0], &bytes, 4)) {
246       char *buf = malloc(bytes);
247       read(conditionalPipe[0], buf, bytes);
248       CcsSendReply(bytes,buf);
249       free(buf);
250     } else {
251       /* the pipe has been closed */
252       CpdEndConditionalDeliver_master();
253    }
254   }
255   else
256 #endif
257   {
258     callHandlerRec(fn,reqLen,reqData);
259   
260 /*Check if a reply was sent*/
261     if (CpvAccess(ccsReq)!=NULL)
262       CcsSendReply(0,NULL);/*Send an empty reply if not*/
263   }
264 }
265
266 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
267 /* The followings are necessary to prevent CCS requests to be processed before
268  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
269  * it doesn't hurt having it in the other case as well */
270 static char **bufferedMessages = NULL;
271 static int CcsNumBufferedMsgs = 0;
272 #define CCS_MAX_NUM_BUFFERED_MSGS  100
273
274 void CcsBufferMessage(char *msg) {
275   CmiPrintf("Buffering CCS message\n");
276   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
277   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
278   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
279   bufferedMessages[CcsNumBufferedMsgs] = msg;
280   CcsNumBufferedMsgs ++;
281 }
282 #endif
283   
284 /*Unpacks request message to call above routine*/
285 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
286
287 #if CMK_BLUEGENE_CHARM
288 CpvDeclare(int, _bgCcsHandlerIdx);
289 CpvDeclare(int, _bgCcsAck);
290 /* This routine is needed when the application is built on top of the bigemulator
291  * layer of Charm. In this case, the real CCS handler must be called within a
292  * worker thread. The function of this function is to receive the CCS message in
293  * the bottom converse layer and forward it to the emulated layer. */
294 static void bg_req_fw_handler(char *msg) {
295   if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
296     CcsBufferMessage(msg);
297     return;
298   }
299   //CmiPrintf("CCS scheduling message\n");
300   /* Get out of the message who is the destination pe */
301   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
302   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
303   int destPE = (int)ChMessageInt(hdr->pe);
304   if (destPE == -1) destPE = 0;
305   if (destPE < -1) {
306     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
307     destPE = ChMessageInt(pes_nbo[0]);
308   }
309   //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
310   (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
311   (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
312   (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
313   (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
314   (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
315   /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
316   addBgNodeInbuffer(msg, destPE/CmiNumPes());
317   //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
318 }
319 #define req_fw_handler bg_req_fw_handler
320 #endif
321 extern void req_fw_handler(char *msg);
322
323 void CcsReleaseMessages() {
324 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
325 #if CMK_BLUEGENE_CHARM
326   if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
327 #endif
328   if (CcsNumBufferedMsgs > 0) {
329     int i;
330     //CmiPrintf("CCS: %d messages released\n",CcsNumBufferedMsgs);
331     for (i=0; i<CcsNumBufferedMsgs; ++i) {
332       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
333       CsdEnqueue(bufferedMessages[i]);
334     }
335     free(bufferedMessages);
336     bufferedMessages = NULL;
337     CcsNumBufferedMsgs = -1;
338   }
339 #endif
340 }
341
342 /*Convert CCS header & message data into a converse message 
343  addressed to handler*/
344 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
345 {
346   int reqLen=ChMessageInt(hdr->len);
347   int destPE = ChMessageInt(hdr->pe);
348   int len;
349   char *msg;
350   if (destPE < -1) reqLen -= destPE*sizeof(int);
351   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
352   msg=(char *)CmiAlloc(len);
353   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
354   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
355   if (ret_len!=NULL) *ret_len=len;
356   if (_ccsHandlerIdx != 0) {
357     CmiSetHandler(msg, _ccsHandlerIdx);
358     return msg;
359   } else {
360 #if NODE_0_IS_CONVHOST
361     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
362 #else
363     CcsBufferMessage(msg);
364     return NULL;
365 #endif
366   }
367 }
368
369 /*Receives reply messages passed up from
370 converse to node 0.*/
371 static void rep_fw_handler(char *msg)
372 {
373   int len;
374   char *r=msg+CmiReservedHeaderSize;
375   CcsImplHeader *hdr=(CcsImplHeader *)r; 
376   r+=sizeof(CcsImplHeader);
377   len=ChMessageInt(hdr->len);
378   CcsImpl_reply(hdr,len,r);
379   CmiFree(msg);
380 }
381
382 #if NODE_0_IS_CONVHOST
383 /************** NODE_0_IS_CONVHOST ***********
384 Non net- versions of charm++ are run without a 
385 (real) conv-host program.  This is fine, except 
386 CCS clients connect via conv-host; so for CCS
387 on non-net- versions of charm++, node 0 carries
388 out the CCS forwarding normally done in conv-host.
389
390 CCS works by listening to a TCP connection on a 
391 port-- the Ccs server socket.  A typical communcation
392 pattern is:
393
394 1.) Random program (CCS client) from the net
395 connects to the CCS server socket and sends
396 a CCS request.
397
398 2.) Node 0 forwards the request to the proper
399 PE as a regular converse message (built in CcsImpl_netReq)
400 for CcsHandleRequest.
401
402 3.) CcsHandleRequest looks up the user's pre-registered
403 CCS handler, and passes the user's handler the request data.
404
405 4.) The user's handler calls CcsSendReply with some
406 reply data; OR finishes without calling CcsSendReply,
407 in which case CcsHandleRequest does it.
408
409 5.) CcsSendReply forwards the reply back to node 0,
410 which sends the reply back to the original requestor,
411 on the (still-open) request socket.
412  */
413
414 /**
415 Send a Ccs reply back to the requestor, down the given socket.
416 Since there is no conv-host, node 0 does all the CCS 
417 communication-- this means all requests come to node 0
418 and are forwarded out; all replies are forwarded back to node 0.
419
420 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
421 */
422 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
423 {
424   const int repPE=0;
425   rep->len=ChMessageInt_new(repLen);
426   if (CmiMyPe()==repPE) {
427     /*Actually deliver reply data*/
428     CcsServer_sendReply(rep,repLen,repData);
429   } else {
430     /*Forward data & socket # to the replyPE*/
431     int len=CmiReservedHeaderSize+
432            sizeof(CcsImplHeader)+repLen;
433     char *msg=CmiAlloc(len);
434     char *r=msg+CmiReservedHeaderSize;
435     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
436     memcpy(r,repData,repLen);
437     CmiSetHandler(msg,rep_fw_handler_idx);
438     CmiSyncSendAndFree(repPE,len,msg);
439   }
440 }
441
442 /*No request will be sent through this socket.
443 Closes it.
444 */
445 /*void CcsImpl_noReply(CcsImplHeader *hdr)
446 {
447   int fd=ChMessageInt(hdr->replyFd);
448   skt_close(fd);
449 }*/
450
451 /**
452  * This is the entrance point of a CCS request into the server.
453  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
454  */
455 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
456 {
457   char *msg;
458   int len,repPE=ChMessageInt(hdr->pe);
459   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
460 #if ! CMK_BLUEGENE_CHARM
461     /*Treat out of bound values as errors. Helps detecting bugs*/
462     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
463     else CmiPrintf("Invalid processor index in CCS request.");
464     CpvAccess(ccsReq)=hdr;
465     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
466     return;
467 #endif
468   }
469
470   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
471   if (repPE >= 0) {
472     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
473     //CmiPrintf("CCS message received for %d\n",repPE);
474     CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
475   } else if (repPE == -1) {
476     /* Broadcast to all processors */
477     //CmiPrintf("CCS broadcast received\n");
478     CmiSyncSendAndFree(0,len,msg);
479   } else {
480     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
481     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
482     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
483     //CmiPrintf("CCS multicast received\n");
484     CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
485   }
486 }
487
488 /*
489 We have to run a CCS server socket here on
490 node 0.  To keep the speed impact minimal,
491 we only probe for new connections (with CcsServerCheck)
492 occasionally.  
493  */
494 #include <signal.h>
495 #include "ccs-server.c" /*Include implementation here in this case*/
496 #include "ccs-auth.c"
497
498 /*Check for ready Ccs messages:*/
499 void CcsServerCheck(void)
500 {
501   while (1==skt_select1(CcsServer_fd(),0)) {
502     CcsImplHeader hdr;
503     void *data;
504     /* printf("Got CCS connect...\n"); */
505     if (CcsServer_recvRequest(&hdr,&data))
506     {/*We got a network request*/
507       /* printf("Got CCS request...\n"); */
508       if (! check_stdio_header(&hdr)) {
509         CcsImpl_netRequest(&hdr,data);
510       }
511       free(data);
512     }
513   }
514 }
515
516 #endif /*NODE_0_IS_CONVHOST*/
517
518 int _isCcsHandlerIdx(int hIdx) {
519   if (hIdx==_ccsHandlerIdx) return 1;
520   if (hIdx==rep_fw_handler_idx) return 1;
521   return 0;
522 }
523
524 void CcsBuiltinsInit(char **argv);
525
526 CpvDeclare(int, cmiArgDebugFlag);
527 CpvDeclare(char *, displayArgument);
528 CpvDeclare(int, cpdSuspendStartup);
529
530 void CcsInit(char **argv)
531 {
532   CpvInitialize(CkHashtable_c, ccsTab);
533   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
534   CpvInitialize(CcsImplHeader *, ccsReq);
535   CpvAccess(ccsReq) = NULL;
536   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
537 #if CMK_BLUEGENE_CHARM
538   CpvInitialize(int, _bgCcsHandlerIdx);
539   CpvAccess(_bgCcsHandlerIdx) = 0;
540   CpvInitialize(int, _bgCcsAck);
541   CpvAccess(_bgCcsAck) = 0;
542 #endif
543   CpvInitialize(int, cmiArgDebugFlag);
544   CpvInitialize(char *, displayArgument);
545   CpvInitialize(int, cpdSuspendStartup);
546   CpvAccess(cmiArgDebugFlag) = 0;
547   CpvAccess(displayArgument) = NULL;
548   CpvAccess(cpdSuspendStartup) = 0;
549   
550   CcsBuiltinsInit(argv);
551
552   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
553 #if NODE_0_IS_CONVHOST
554 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
555   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
556 #endif
557   {
558    int ccs_serverPort=0;
559    char *ccs_serverAuth=NULL;
560    
561    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
562       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
563       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
564     if (CmiMyPe()==0)
565     {/*Create and occasionally poll on a CCS server port*/
566       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
567       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
568     }
569   }
570 #endif
571   /* if in parallel debug mode i.e ++cpd, freeze */
572   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
573   {
574      CpvAccess(cmiArgDebugFlag) = 1;
575      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
576      {
577         if (CpvAccess(displayArgument) == NULL)
578             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
579      }
580      else if (CmiMyPe() == 0)
581      {
582             /* only one processor prints the warning */
583             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
584      }
585
586      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
587        CpvAccess(cpdSuspendStartup) = 1;
588      }
589   }
590
591   CcsReleaseMessages();
592 }
593
594 #endif /*CMK_CCS_AVAILABLE*/
595