Merge branch 'conditionalDelivery' into charm
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
28   if (strlen(name)>=CCS_MAXHANDLER) 
29         CmiAbort("CCS handler names cannot exceed 32 characters");
30   c->name=strdup(name);
31   c->fn=NULL;
32   c->fnOld=NULL;
33   c->userPtr=NULL;
34   c->mergeFn=NULL;
35   c->nCalls=0;
36 }
37
38 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
39         c->nCalls++;
40         if (c->fnOld) 
41         { /* Backward compatability version:
42             Pack user data into a converse message (cripes! why bother?);
43             user will delete the message. 
44           */
45                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
46                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
47                 (c->fnOld)(cmsg);
48         }
49         else { /* Pass read-only copy of data straight to user */
50                 (c->fn)(c->userPtr, reqLen, reqData);
51         }
52 }
53
54 /*Table maps handler name to CcsHandler object*/
55 CpvDeclare(CcsHandlerTable, ccsTab);
56
57 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
58
59 void CcsRegisterHandler(const char *name, CmiHandler fn) {
60   CcsHandlerRec cp;
61   initHandlerRec(&cp,name);
62   cp.fnOld=fn;
63   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
64 }
65 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
66   CcsHandlerRec cp;
67   initHandlerRec(&cp,name);
68   cp.fn=fn;
69   cp.userPtr=ptr;
70   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
71 }
72 CcsHandlerRec *CcsGetHandler(const char *name) {
73   return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
74 }
75 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
76   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
77   if (rec==NULL) {
78     CmiAbort("CCS: Unknown CCS handler name.\n");
79   }
80   rec->mergeFn=newMerge;
81   rec->redID=CmiGetGlobalReduction();
82 }
83
84 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
85   CcsImplHeader *hdr;
86   int total = *size;
87   void *reply;
88   char *ptr;
89   int i;
90   for (i=0; i<n; ++i) {
91     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
92     total += ChMessageInt(hdr->len);
93   }
94   reply = CmiAlloc(total);
95   memcpy(reply, local, *size);
96   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
97   CmiFree(local);
98   ptr = ((char*)reply)+*size;
99   for (i=0; i<n; ++i) {
100     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
101     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
102     ptr += len;
103   }
104   *size = total;
105   return reply;
106 }
107
108 #define SIMPLE_REDUCTION(name, dataType, loop) \
109 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
110   int i, m; \
111   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
112   int lenLocal = ChMessageInt(hdrLocal->len); \
113   int nElem = lenLocal / sizeof(dataType); \
114   dataType *ret = (dataType *) (hdrLocal+1); \
115   CcsImplHeader *hdr; \
116   for (m=0; m<n; ++m) { \
117     int len; \
118     dataType *value; \
119     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
120     len = ChMessageInt(hdr->len); \
121     value = (dataType *)(hdr+1); \
122     CmiAssert(lenLocal == len); \
123     for (i=0; i<nElem; ++i) loop; \
124   } \
125   return local; \
126 }
127
128 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
129 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
130 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
131 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
132
133 /*Use this macro for reductions that have the same type for all inputs */
134 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
135   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
136   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
137   SIMPLE_REDUCTION(nameBase##_double, double, loop)
138
139 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
140 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
141 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
142 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
143
144 #undef SIMPLE_REDUCTION
145 #undef SIMPLE_POLYMORPH_REDUCTION
146
147 int CcsEnabled(void)
148 {
149   return 1;
150 }
151
152 int CcsIsRemoteRequest(void)
153 {
154   return CpvAccess(ccsReq)!=NULL;
155 }
156
157 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
158 {
159   *pip = CpvAccess(ccsReq)->attr.ip;
160   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
161 }
162
163 int rep_fw_handler_idx;
164
165 CcsDelayedReply CcsDelayReply(void)
166 {
167   CcsDelayedReply ret;
168   int len = sizeof(CcsImplHeader);
169   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
170     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
171   ret.hdr = (CcsImplHeader*)malloc(len);
172   memcpy(ret.hdr, CpvAccess(ccsReq), len);
173   CpvAccess(ccsReq)=NULL;
174   return ret;
175 }
176
177 void CcsSendReply(int replyLen, const void *replyData)
178 {
179   if (CpvAccess(ccsReq)==NULL)
180     CmiAbort("CcsSendReply: reply already sent!\n");
181   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
182   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
183   CpvAccess(ccsReq) = NULL;
184 }
185
186 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
187 {
188   CcsImplHeader *h = d.hdr;
189   h->len=ChMessageInt_new(1);
190   CcsReply(h,replyLen,replyData);
191   free(h);
192 }
193
194 void CcsNoReply()
195 {
196   if (CpvAccess(ccsReq)==NULL) return;
197   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
198   CcsReply(CpvAccess(ccsReq),0,NULL);
199   CpvAccess(ccsReq) = NULL;
200 }
201
202 void CcsNoDelayedReply(CcsDelayedReply d)
203 {
204   CcsImplHeader *h = d.hdr;
205   h->len = ChMessageInt_new(0);
206   CcsReply(h,0,NULL);
207   free(h);
208 }
209
210
211 /**********************************
212 _CCS Implementation Routines:
213   These do the request forwarding and
214 delivery.
215 ***********************************/
216
217 /*CCS Bottleneck:
218   Deliver the given message data to the given
219 CCS handler.
220 */
221 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
222 {
223   char *cmsg;
224   int reqLen=ChMessageInt(hdr->len);
225 /*Look up handler's converse ID*/
226   char *handlerStr=hdr->handler;
227   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
228   if (fn==NULL) {
229     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
230               hdr->handler);
231     CpvAccess(ccsReq)=hdr;
232     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
233     return;
234  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
235   }
236
237 /* Call the handler */
238   CpvAccess(ccsReq)=hdr;
239 #if CMK_CHARMDEBUG
240   if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
241     /* We are conditionally delivering, the message has been sent to the child, wait for its response */
242     int bytes;
243     if (4==read(conditionalPipe[0], &bytes, 4)) {
244       char *buf = malloc(bytes);
245       read(conditionalPipe[0], buf, bytes);
246       CcsSendReply(bytes,buf);
247       free(buf);
248     } else {
249       /* the pipe has been closed */
250       CpdEndConditionalDeliver_master();
251    }
252   }
253   else
254 #endif
255   {
256     callHandlerRec(fn,reqLen,reqData);
257   
258 /*Check if a reply was sent*/
259     if (CpvAccess(ccsReq)!=NULL)
260       CcsSendReply(0,NULL);/*Send an empty reply if not*/
261   }
262 }
263
264 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
265 /* The followings are necessary to prevent CCS requests to be processed before
266  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
267  * it doesn't hurt having it in the other case as well */
268 static char **bufferedMessages = NULL;
269 static int CcsNumBufferedMsgs = 0;
270 #define CCS_MAX_NUM_BUFFERED_MSGS  100
271
272 void CcsBufferMessage(char *msg) {
273   CmiPrintf("Buffering CCS message\n");
274   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
275   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
276   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
277   bufferedMessages[CcsNumBufferedMsgs] = msg;
278   CcsNumBufferedMsgs ++;
279 }
280 #endif
281   
282 /*Unpacks request message to call above routine*/
283 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
284
285 #if CMK_BLUEGENE_CHARM
286 CpvDeclare(int, _bgCcsHandlerIdx);
287 CpvDeclare(int, _bgCcsAck);
288 /* This routine is needed when the application is built on top of the bigemulator
289  * layer of Charm. In this case, the real CCS handler must be called within a
290  * worker thread. The function of this function is to receive the CCS message in
291  * the bottom converse layer and forward it to the emulated layer. */
292 static void bg_req_fw_handler(char *msg) {
293   if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
294     CcsBufferMessage(msg);
295     return;
296   }
297   //CmiPrintf("CCS scheduling message\n");
298   /* Get out of the message who is the destination pe */
299   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
300   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
301   int destPE = (int)ChMessageInt(hdr->pe);
302   if (destPE == -1) destPE = 0;
303   if (destPE < -1) {
304     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
305     destPE = ChMessageInt(pes_nbo[0]);
306   }
307   //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
308   (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
309   (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
310   (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
311   (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
312   (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
313   /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
314   addBgNodeInbuffer(msg, destPE/CmiNumPes());
315   //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
316 }
317 #define req_fw_handler bg_req_fw_handler
318 #endif
319 extern void req_fw_handler(char *msg);
320
321 void CcsReleaseMessages() {
322 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
323 #if CMK_BLUEGENE_CHARM
324   if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
325 #endif
326   if (CcsNumBufferedMsgs > 0) {
327     int i;
328     //CmiPrintf("CCS: %d messages released\n",CcsNumBufferedMsgs);
329     for (i=0; i<CcsNumBufferedMsgs; ++i) {
330       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
331       CsdEnqueue(bufferedMessages[i]);
332     }
333     free(bufferedMessages);
334     bufferedMessages = NULL;
335     CcsNumBufferedMsgs = -1;
336   }
337 #endif
338 }
339
340 /*Convert CCS header & message data into a converse message 
341  addressed to handler*/
342 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
343 {
344   int reqLen=ChMessageInt(hdr->len);
345   int destPE = ChMessageInt(hdr->pe);
346   int len;
347   char *msg;
348   if (destPE < -1) reqLen -= destPE*sizeof(int);
349   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
350   msg=(char *)CmiAlloc(len);
351   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
352   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
353   if (ret_len!=NULL) *ret_len=len;
354   if (_ccsHandlerIdx != 0) {
355     CmiSetHandler(msg, _ccsHandlerIdx);
356     return msg;
357   } else {
358 #if NODE_0_IS_CONVHOST
359     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
360 #else
361     CcsBufferMessage(msg);
362     return NULL;
363 #endif
364   }
365 }
366
367 /*Receives reply messages passed up from
368 converse to node 0.*/
369 static void rep_fw_handler(char *msg)
370 {
371   int len;
372   char *r=msg+CmiReservedHeaderSize;
373   CcsImplHeader *hdr=(CcsImplHeader *)r; 
374   r+=sizeof(CcsImplHeader);
375   len=ChMessageInt(hdr->len);
376   CcsImpl_reply(hdr,len,r);
377   CmiFree(msg);
378 }
379
380 #if NODE_0_IS_CONVHOST
381 /************** NODE_0_IS_CONVHOST ***********
382 Non net- versions of charm++ are run without a 
383 (real) conv-host program.  This is fine, except 
384 CCS clients connect via conv-host; so for CCS
385 on non-net- versions of charm++, node 0 carries
386 out the CCS forwarding normally done in conv-host.
387
388 CCS works by listening to a TCP connection on a 
389 port-- the Ccs server socket.  A typical communcation
390 pattern is:
391
392 1.) Random program (CCS client) from the net
393 connects to the CCS server socket and sends
394 a CCS request.
395
396 2.) Node 0 forwards the request to the proper
397 PE as a regular converse message (built in CcsImpl_netReq)
398 for CcsHandleRequest.
399
400 3.) CcsHandleRequest looks up the user's pre-registered
401 CCS handler, and passes the user's handler the request data.
402
403 4.) The user's handler calls CcsSendReply with some
404 reply data; OR finishes without calling CcsSendReply,
405 in which case CcsHandleRequest does it.
406
407 5.) CcsSendReply forwards the reply back to node 0,
408 which sends the reply back to the original requestor,
409 on the (still-open) request socket.
410  */
411
412 /**
413 Send a Ccs reply back to the requestor, down the given socket.
414 Since there is no conv-host, node 0 does all the CCS 
415 communication-- this means all requests come to node 0
416 and are forwarded out; all replies are forwarded back to node 0.
417
418 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
419 */
420 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
421 {
422   const int repPE=0;
423   rep->len=ChMessageInt_new(repLen);
424   if (CmiMyPe()==repPE) {
425     /*Actually deliver reply data*/
426     CcsServer_sendReply(rep,repLen,repData);
427   } else {
428     /*Forward data & socket # to the replyPE*/
429     int len=CmiReservedHeaderSize+
430            sizeof(CcsImplHeader)+repLen;
431     char *msg=CmiAlloc(len);
432     char *r=msg+CmiReservedHeaderSize;
433     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
434     memcpy(r,repData,repLen);
435     CmiSetHandler(msg,rep_fw_handler_idx);
436     CmiSyncSendAndFree(repPE,len,msg);
437   }
438 }
439
440 /*No request will be sent through this socket.
441 Closes it.
442 */
443 /*void CcsImpl_noReply(CcsImplHeader *hdr)
444 {
445   int fd=ChMessageInt(hdr->replyFd);
446   skt_close(fd);
447 }*/
448
449 /**
450  * This is the entrance point of a CCS request into the server.
451  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
452  */
453 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
454 {
455   char *msg;
456   int len,repPE=ChMessageInt(hdr->pe);
457   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
458 #if ! CMK_BLUEGENE_CHARM
459     /*Treat out of bound values as errors. Helps detecting bugs*/
460     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
461     else CmiPrintf("Invalid processor index in CCS request.");
462     CpvAccess(ccsReq)=hdr;
463     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
464     return;
465 #endif
466   }
467
468   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
469   if (repPE >= 0) {
470     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
471     //CmiPrintf("CCS message received for %d\n",repPE);
472     CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
473   } else if (repPE == -1) {
474     /* Broadcast to all processors */
475     //CmiPrintf("CCS broadcast received\n");
476     CmiSyncSendAndFree(0,len,msg);
477   } else {
478     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
479     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
480     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
481     //CmiPrintf("CCS multicast received\n");
482     CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
483   }
484 }
485
486 /*
487 We have to run a CCS server socket here on
488 node 0.  To keep the speed impact minimal,
489 we only probe for new connections (with CcsServerCheck)
490 occasionally.  
491  */
492 #include <signal.h>
493 #include "ccs-server.c" /*Include implementation here in this case*/
494 #include "ccs-auth.c"
495
496 /*Check for ready Ccs messages:*/
497 void CcsServerCheck(void)
498 {
499   while (1==skt_select1(CcsServer_fd(),0)) {
500     CcsImplHeader hdr;
501     void *data;
502     /* printf("Got CCS connect...\n"); */
503     if (CcsServer_recvRequest(&hdr,&data))
504     {/*We got a network request*/
505       /* printf("Got CCS request...\n"); */
506       if (! check_stdio_header(&hdr)) {
507         CcsImpl_netRequest(&hdr,data);
508       }
509       free(data);
510     }
511   }
512 }
513
514 #endif /*NODE_0_IS_CONVHOST*/
515
516 int _isCcsHandlerIdx(int hIdx) {
517   if (hIdx==_ccsHandlerIdx) return 1;
518   if (hIdx==rep_fw_handler_idx) return 1;
519   return 0;
520 }
521
522 void CcsBuiltinsInit(char **argv);
523
524 CpvDeclare(int, cmiArgDebugFlag);
525 CpvDeclare(char *, displayArgument);
526 CpvDeclare(int, cpdSuspendStartup);
527
528 void CcsInit(char **argv)
529 {
530   CpvInitialize(CkHashtable_c, ccsTab);
531   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
532   CpvInitialize(CcsImplHeader *, ccsReq);
533   CpvAccess(ccsReq) = NULL;
534   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
535 #if CMK_BLUEGENE_CHARM
536   CpvInitialize(int, _bgCcsHandlerIdx);
537   CpvAccess(_bgCcsHandlerIdx) = 0;
538   CpvInitialize(int, _bgCcsAck);
539   CpvAccess(_bgCcsAck) = 0;
540 #endif
541   CpvInitialize(int, cmiArgDebugFlag);
542   CpvInitialize(char *, displayArgument);
543   CpvInitialize(int, cpdSuspendStartup);
544   CpvAccess(cmiArgDebugFlag) = 0;
545   CpvAccess(displayArgument) = NULL;
546   CpvAccess(cpdSuspendStartup) = 0;
547   
548   CcsBuiltinsInit(argv);
549
550   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
551 #if NODE_0_IS_CONVHOST
552 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
553   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
554 #endif
555   {
556    int ccs_serverPort=0;
557    char *ccs_serverAuth=NULL;
558    
559    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
560       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
561       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
562     if (CmiMyPe()==0)
563     {/*Create and occasionally poll on a CCS server port*/
564       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
565       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
566     }
567   }
568 #endif
569   /* if in parallel debug mode i.e ++cpd, freeze */
570   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
571   {
572      CpvAccess(cmiArgDebugFlag) = 1;
573      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
574      {
575         if (CpvAccess(displayArgument) == NULL)
576             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
577      }
578      else if (CmiMyPe() == 0)
579      {
580             /* only one processor prints the warning */
581             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
582      }
583
584      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
585        CpvAccess(cpdSuspendStartup) = 1;
586      }
587   }
588
589   CcsReleaseMessages();
590 }
591
592 #endif /*CMK_CCS_AVAILABLE*/
593