revert changes in CmiCopyArgs and LB
[charm.git] / src / conv-ccs / conv-ccs.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <errno.h>
4 #include <string.h>
5
6 #include "converse.h"
7 #include "conv-ccs.h"
8 #include "ccs-server.h"
9 #include "sockRoutines.h"
10 #include "queueing.h"
11
12 #if CMK_CCS_AVAILABLE
13
14 /*****************************************************************************
15  *
16  * Converse Client-Server Functions
17  *
18  *****************************************************************************/
19  
20 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
21   if (strlen(name)>=CCS_MAXHANDLER) 
22         CmiAbort("CCS handler names cannot exceed 32 characters");
23   c->name=strdup(name);
24   c->fn=NULL;
25   c->fnOld=NULL;
26   c->userPtr=NULL;
27   c->mergeFn=NULL;
28   c->nCalls=0;
29 }
30
31 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
32         c->nCalls++;
33         if (c->fnOld) 
34         { /* Backward compatability version:
35             Pack user data into a converse message (cripes! why bother?);
36             user will delete the message. 
37           */
38                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
39                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
40                 (c->fnOld)(cmsg);
41         }
42         else { /* Pass read-only copy of data straight to user */
43                 (c->fn)(c->userPtr, reqLen, reqData);
44         }
45 }
46
47 /*Table maps handler name to CcsHandler object*/
48 CpvDeclare(CcsHandlerTable, ccsTab);
49
50 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
51
52 void CcsRegisterHandler(const char *name, CmiHandler fn) {
53   CcsHandlerRec cp;
54   initHandlerRec(&cp,name);
55   cp.fnOld=fn;
56   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
57 }
58 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
59   CcsHandlerRec cp;
60   initHandlerRec(&cp,name);
61   cp.fn=fn;
62   cp.userPtr=ptr;
63   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
64 }
65 CcsHandlerRec *CcsGetHandler(const char *name) {
66   return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
67 }
68 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
69   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
70   if (rec==NULL) {
71     CmiAbort("CCS: Unknown CCS handler name.\n");
72   }
73   rec->mergeFn=newMerge;
74   rec->redID=CmiGetGlobalReduction();
75 }
76
77 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
78   CcsImplHeader *hdr;
79   int total = *size;
80   void *reply;
81   char *ptr;
82   int i;
83   for (i=0; i<n; ++i) {
84     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
85     total += ChMessageInt(hdr->len);
86   }
87   reply = CmiAlloc(total);
88   memcpy(reply, local, *size);
89   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
90   CmiFree(local);
91   ptr = ((char*)reply)+*size;
92   for (i=0; i<n; ++i) {
93     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
94     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
95     ptr += len;
96   }
97   *size = total;
98   return reply;
99 }
100
101 #define SIMPLE_REDUCTION(name, dataType, loop) \
102 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
103   int i, m; \
104   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
105   int lenLocal = ChMessageInt(hdrLocal->len); \
106   int nElem = lenLocal / sizeof(dataType); \
107   dataType *ret = (dataType *) (hdrLocal+1); \
108   CcsImplHeader *hdr; \
109   for (m=0; m<n; ++m) { \
110     int len; \
111     dataType *value; \
112     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
113     len = ChMessageInt(hdr->len); \
114     value = (dataType *)(hdr+1); \
115     CmiAssert(lenLocal == len); \
116     for (i=0; i<nElem; ++i) loop; \
117   } \
118   return local; \
119 }
120
121 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
122 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
123 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
124 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
125
126 /*Use this macro for reductions that have the same type for all inputs */
127 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
128   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
129   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
130   SIMPLE_REDUCTION(nameBase##_double, double, loop)
131
132 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
133 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
134 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
135 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
136
137 #undef SIMPLE_REDUCTION
138 #undef SIMPLE_POLYMORPH_REDUCTION
139
140 int CcsEnabled(void)
141 {
142   return 1;
143 }
144
145 int CcsIsRemoteRequest(void)
146 {
147   return CpvAccess(ccsReq)!=NULL;
148 }
149
150 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
151 {
152   *pip = CpvAccess(ccsReq)->attr.ip;
153   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
154 }
155
156 int rep_fw_handler_idx;
157
158 CcsDelayedReply CcsDelayReply(void)
159 {
160   CcsDelayedReply ret;
161   int len = sizeof(CcsImplHeader);
162   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
163     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
164   ret.hdr = (CcsImplHeader*)malloc(len);
165   memcpy(ret.hdr, CpvAccess(ccsReq), len);
166   CpvAccess(ccsReq)=NULL;
167   return ret;
168 }
169
170 void CcsSendReply(int replyLen, const void *replyData)
171 {
172   if (CpvAccess(ccsReq)==NULL)
173     CmiAbort("CcsSendReply: reply already sent!\n");
174   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
175   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
176   CpvAccess(ccsReq) = NULL;
177 }
178
179 void CcsSendReplyNoError(int replyLen, const void *replyData) {
180   if (CpvAccess(ccsReq)==NULL) return;
181   CcsSendReply(replyLen, replyData);
182 }
183
184 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
185 {
186   CcsImplHeader *h = d.hdr;
187   h->len=ChMessageInt_new(1);
188   CcsReply(h,replyLen,replyData);
189   free(h);
190 }
191
192 void CcsNoReply()
193 {
194   if (CpvAccess(ccsReq)==NULL) return;
195   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
196   CcsReply(CpvAccess(ccsReq),0,NULL);
197   CpvAccess(ccsReq) = NULL;
198 }
199
200 void CcsNoDelayedReply(CcsDelayedReply d)
201 {
202   CcsImplHeader *h = d.hdr;
203   h->len = ChMessageInt_new(0);
204   CcsReply(h,0,NULL);
205   free(h);
206 }
207
208
209 /**********************************
210 _CCS Implementation Routines:
211   These do the request forwarding and
212 delivery.
213 ***********************************/
214
215 /*CCS Bottleneck:
216   Deliver the given message data to the given
217 CCS handler.
218 */
219 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
220 {
221   char *cmsg;
222   int reqLen=ChMessageInt(hdr->len);
223 /*Look up handler's converse ID*/
224   char *handlerStr=hdr->handler;
225   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
226   if (fn==NULL) {
227     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
228               hdr->handler);
229     CpvAccess(ccsReq)=hdr;
230     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
231     return;
232  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
233   }
234
235 /* Call the handler */
236   CpvAccess(ccsReq)=hdr;
237 #if CMK_CHARMDEBUG
238   if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
239     /* We are conditionally delivering, the message has been sent to the child, wait for its response */
240     int bytes;
241     if (4==read(conditionalPipe[0], &bytes, 4)) {
242       char *buf = malloc(bytes);
243       read(conditionalPipe[0], buf, bytes);
244       CcsSendReply(bytes,buf);
245       free(buf);
246     } else {
247       /* the pipe has been closed */
248       CpdEndConditionalDeliver_master();
249    }
250   }
251   else
252 #endif
253   {
254     callHandlerRec(fn,reqLen,reqData);
255   
256 /*Check if a reply was sent*/
257     if (CpvAccess(ccsReq)!=NULL)
258       CcsSendReply(0,NULL);/*Send an empty reply if not*/
259   }
260 }
261
262 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
263 /* The followings are necessary to prevent CCS requests to be processed before
264  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
265  * it doesn't hurt having it in the other case as well */
266 static char **bufferedMessages = NULL;
267 static int CcsNumBufferedMsgs = 0;
268 #define CCS_MAX_NUM_BUFFERED_MSGS  100
269
270 void CcsBufferMessage(char *msg) {
271   CmiPrintf("Buffering CCS message\n");
272   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
273   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
274   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
275   bufferedMessages[CcsNumBufferedMsgs] = msg;
276   CcsNumBufferedMsgs ++;
277 }
278 #endif
279   
280 /*Unpacks request message to call above routine*/
281 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
282
283 #if CMK_BIGSIM_CHARM
284 CpvDeclare(int, _bgCcsHandlerIdx);
285 CpvDeclare(int, _bgCcsAck);
286 /* This routine is needed when the application is built on top of the bigemulator
287  * layer of Charm. In this case, the real CCS handler must be called within a
288  * worker thread. The function of this function is to receive the CCS message in
289  * the bottom converse layer and forward it to the emulated layer. */
290 static void bg_req_fw_handler(char *msg) {
291   if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
292     CcsBufferMessage(msg);
293     return;
294   }
295   //CmiPrintf("CCS scheduling message\n");
296   /* Get out of the message who is the destination pe */
297   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
298   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
299   int destPE = (int)ChMessageInt(hdr->pe);
300   if (destPE == -1) destPE = 0;
301   if (destPE < -1) {
302     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
303     destPE = ChMessageInt(pes_nbo[0]);
304   }
305   //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
306   (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
307   (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
308   (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
309   (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
310   (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
311   /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
312   addBgNodeInbuffer(msg, destPE/CmiNumPes());
313   //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
314 }
315 #define req_fw_handler bg_req_fw_handler
316 #endif
317 extern void req_fw_handler(char *msg);
318
319 void CcsReleaseMessages() {
320 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
321 #if CMK_BIGSIM_CHARM
322   if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
323 #endif
324   if (CcsNumBufferedMsgs > 0) {
325     int i;
326     //CmiPrintf("CCS: %d messages released\n",CcsNumBufferedMsgs);
327     for (i=0; i<CcsNumBufferedMsgs; ++i) {
328       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
329       CsdEnqueue(bufferedMessages[i]);
330     }
331     free(bufferedMessages);
332     bufferedMessages = NULL;
333     CcsNumBufferedMsgs = -1;
334   }
335 #endif
336 }
337
338 /*Convert CCS header & message data into a converse message 
339  addressed to handler*/
340 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
341 {
342   int reqLen=ChMessageInt(hdr->len);
343   int destPE = ChMessageInt(hdr->pe);
344   int len;
345   char *msg;
346   if (destPE < -1) reqLen -= destPE*sizeof(int);
347   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
348   msg=(char *)CmiAlloc(len);
349   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
350   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
351   if (ret_len!=NULL) *ret_len=len;
352   if (_ccsHandlerIdx != 0) {
353     CmiSetHandler(msg, _ccsHandlerIdx);
354     return msg;
355   } else {
356 #if NODE_0_IS_CONVHOST
357     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
358 #else
359     CcsBufferMessage(msg);
360     return NULL;
361 #endif
362   }
363 }
364
365 /*Receives reply messages passed up from
366 converse to node 0.*/
367 static void rep_fw_handler(char *msg)
368 {
369   int len;
370   char *r=msg+CmiReservedHeaderSize;
371   CcsImplHeader *hdr=(CcsImplHeader *)r; 
372   r+=sizeof(CcsImplHeader);
373   len=ChMessageInt(hdr->len);
374   CcsImpl_reply(hdr,len,r);
375   CmiFree(msg);
376 }
377
378 #if NODE_0_IS_CONVHOST
379 /************** NODE_0_IS_CONVHOST ***********
380 Non net- versions of charm++ are run without a 
381 (real) conv-host program.  This is fine, except 
382 CCS clients connect via conv-host; so for CCS
383 on non-net- versions of charm++, node 0 carries
384 out the CCS forwarding normally done in conv-host.
385
386 CCS works by listening to a TCP connection on a 
387 port-- the Ccs server socket.  A typical communcation
388 pattern is:
389
390 1.) Random program (CCS client) from the net
391 connects to the CCS server socket and sends
392 a CCS request.
393
394 2.) Node 0 forwards the request to the proper
395 PE as a regular converse message (built in CcsImpl_netReq)
396 for CcsHandleRequest.
397
398 3.) CcsHandleRequest looks up the user's pre-registered
399 CCS handler, and passes the user's handler the request data.
400
401 4.) The user's handler calls CcsSendReply with some
402 reply data; OR finishes without calling CcsSendReply,
403 in which case CcsHandleRequest does it.
404
405 5.) CcsSendReply forwards the reply back to node 0,
406 which sends the reply back to the original requestor,
407 on the (still-open) request socket.
408  */
409
410 /**
411 Send a Ccs reply back to the requestor, down the given socket.
412 Since there is no conv-host, node 0 does all the CCS 
413 communication-- this means all requests come to node 0
414 and are forwarded out; all replies are forwarded back to node 0.
415
416 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
417 */
418 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
419 {
420   const int repPE=0;
421   rep->len=ChMessageInt_new(repLen);
422   if (CmiMyPe()==repPE) {
423     /*Actually deliver reply data*/
424     CcsServer_sendReply(rep,repLen,repData);
425   } else {
426     /*Forward data & socket # to the replyPE*/
427     int len=CmiReservedHeaderSize+
428            sizeof(CcsImplHeader)+repLen;
429     char *msg=CmiAlloc(len);
430     char *r=msg+CmiReservedHeaderSize;
431     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
432     memcpy(r,repData,repLen);
433     CmiSetHandler(msg,rep_fw_handler_idx);
434     CmiSyncSendAndFree(repPE,len,msg);
435   }
436 }
437
438 /*No request will be sent through this socket.
439 Closes it.
440 */
441 /*void CcsImpl_noReply(CcsImplHeader *hdr)
442 {
443   int fd=ChMessageInt(hdr->replyFd);
444   skt_close(fd);
445 }*/
446
447 /**
448  * This is the entrance point of a CCS request into the server.
449  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
450  */
451 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
452 {
453   char *msg;
454   int len,repPE=ChMessageInt(hdr->pe);
455   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
456 #if ! CMK_BIGSIM_CHARM
457     /*Treat out of bound values as errors. Helps detecting bugs*/
458     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
459     else CmiPrintf("Invalid processor index in CCS request.");
460     CpvAccess(ccsReq)=hdr;
461     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
462     return;
463 #endif
464   }
465
466   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
467   if (repPE >= 0) {
468     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
469     //CmiPrintf("CCS message received for %d\n",repPE);
470     CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
471   } else if (repPE == -1) {
472     /* Broadcast to all processors */
473     //CmiPrintf("CCS broadcast received\n");
474     CmiSyncSendAndFree(0,len,msg);
475   } else {
476     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
477     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
478     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
479     //CmiPrintf("CCS multicast received\n");
480     CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
481   }
482 }
483
484 /*
485 We have to run a CCS server socket here on
486 node 0.  To keep the speed impact minimal,
487 we only probe for new connections (with CcsServerCheck)
488 occasionally.  
489  */
490 #include <signal.h>
491 #include "ccs-server.c" /*Include implementation here in this case*/
492 #include "ccs-auth.c"
493
494 /*Check for ready Ccs messages:*/
495 void CcsServerCheck(void)
496 {
497   while (1==skt_select1(CcsServer_fd(),0)) {
498     CcsImplHeader hdr;
499     void *data;
500     /* printf("Got CCS connect...\n"); */
501     if (CcsServer_recvRequest(&hdr,&data))
502     {/*We got a network request*/
503       /* printf("Got CCS request...\n"); */
504       if (! check_stdio_header(&hdr)) {
505         CcsImpl_netRequest(&hdr,data);
506       }
507       free(data);
508     }
509   }
510 }
511
512 #endif /*NODE_0_IS_CONVHOST*/
513
514 int _isCcsHandlerIdx(int hIdx) {
515   if (hIdx==_ccsHandlerIdx) return 1;
516   if (hIdx==rep_fw_handler_idx) return 1;
517   return 0;
518 }
519
520 void CcsBuiltinsInit(char **argv);
521
522 CpvDeclare(int, cmiArgDebugFlag);
523 CpvDeclare(char *, displayArgument);
524 CpvDeclare(int, cpdSuspendStartup);
525
526 void CcsInit(char **argv)
527 {
528   CpvInitialize(CkHashtable_c, ccsTab);
529   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
530   CpvInitialize(CcsImplHeader *, ccsReq);
531   CpvAccess(ccsReq) = NULL;
532   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
533 #if CMK_BIGSIM_CHARM
534   CpvInitialize(int, _bgCcsHandlerIdx);
535   CpvAccess(_bgCcsHandlerIdx) = 0;
536   CpvInitialize(int, _bgCcsAck);
537   CpvAccess(_bgCcsAck) = 0;
538 #endif
539   CpvInitialize(int, cmiArgDebugFlag);
540   CpvInitialize(char *, displayArgument);
541   CpvInitialize(int, cpdSuspendStartup);
542   CpvAccess(cmiArgDebugFlag) = 0;
543   CpvAccess(displayArgument) = NULL;
544   CpvAccess(cpdSuspendStartup) = 0;
545   
546   CcsBuiltinsInit(argv);
547
548   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
549 #if NODE_0_IS_CONVHOST
550 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
551   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
552 #endif
553   {
554    int ccs_serverPort=0;
555    char *ccs_serverAuth=NULL;
556    
557    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
558       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
559       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
560     if (CmiMyPe()==0)
561     {/*Create and occasionally poll on a CCS server port*/
562       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
563       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
564     }
565   }
566 #endif
567   /* if in parallel debug mode i.e ++cpd, freeze */
568   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
569   {
570      CpvAccess(cmiArgDebugFlag) = 1;
571      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
572      {
573         if (CpvAccess(displayArgument) == NULL)
574             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
575      }
576      else if (CmiMyPe() == 0)
577      {
578             /* only one processor prints the warning */
579             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
580      }
581
582      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
583        CpvAccess(cpdSuspendStartup) = 1;
584      }
585   }
586
587   CcsReleaseMessages();
588 }
589
590 #endif /*CMK_CCS_AVAILABLE*/
591