Added new function to return a reply if no reply has already been sent, without abort...
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
28   if (strlen(name)>=CCS_MAXHANDLER) 
29         CmiAbort("CCS handler names cannot exceed 32 characters");
30   c->name=strdup(name);
31   c->fn=NULL;
32   c->fnOld=NULL;
33   c->userPtr=NULL;
34   c->mergeFn=NULL;
35   c->nCalls=0;
36 }
37
38 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
39         c->nCalls++;
40         if (c->fnOld) 
41         { /* Backward compatability version:
42             Pack user data into a converse message (cripes! why bother?);
43             user will delete the message. 
44           */
45                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
46                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
47                 (c->fnOld)(cmsg);
48         }
49         else { /* Pass read-only copy of data straight to user */
50                 (c->fn)(c->userPtr, reqLen, reqData);
51         }
52 }
53
54 /*Table maps handler name to CcsHandler object*/
55 CpvDeclare(CcsHandlerTable, ccsTab);
56
57 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
58
59 void CcsRegisterHandler(const char *name, CmiHandler fn) {
60   CcsHandlerRec cp;
61   initHandlerRec(&cp,name);
62   cp.fnOld=fn;
63   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
64 }
65 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
66   CcsHandlerRec cp;
67   initHandlerRec(&cp,name);
68   cp.fn=fn;
69   cp.userPtr=ptr;
70   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
71 }
72 CcsHandlerRec *CcsGetHandler(const char *name) {
73   return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
74 }
75 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
76   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
77   if (rec==NULL) {
78     CmiAbort("CCS: Unknown CCS handler name.\n");
79   }
80   rec->mergeFn=newMerge;
81   rec->redID=CmiGetGlobalReduction();
82 }
83
84 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
85   CcsImplHeader *hdr;
86   int total = *size;
87   void *reply;
88   char *ptr;
89   int i;
90   for (i=0; i<n; ++i) {
91     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
92     total += ChMessageInt(hdr->len);
93   }
94   reply = CmiAlloc(total);
95   memcpy(reply, local, *size);
96   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
97   CmiFree(local);
98   ptr = ((char*)reply)+*size;
99   for (i=0; i<n; ++i) {
100     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
101     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
102     ptr += len;
103   }
104   *size = total;
105   return reply;
106 }
107
108 #define SIMPLE_REDUCTION(name, dataType, loop) \
109 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
110   int i, m; \
111   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
112   int lenLocal = ChMessageInt(hdrLocal->len); \
113   int nElem = lenLocal / sizeof(dataType); \
114   dataType *ret = (dataType *) (hdrLocal+1); \
115   CcsImplHeader *hdr; \
116   for (m=0; m<n; ++m) { \
117     int len; \
118     dataType *value; \
119     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
120     len = ChMessageInt(hdr->len); \
121     value = (dataType *)(hdr+1); \
122     CmiAssert(lenLocal == len); \
123     for (i=0; i<nElem; ++i) loop; \
124   } \
125   return local; \
126 }
127
128 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
129 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
130 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
131 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
132
133 /*Use this macro for reductions that have the same type for all inputs */
134 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
135   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
136   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
137   SIMPLE_REDUCTION(nameBase##_double, double, loop)
138
139 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
140 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
141 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
142 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
143
144 #undef SIMPLE_REDUCTION
145 #undef SIMPLE_POLYMORPH_REDUCTION
146
147 int CcsEnabled(void)
148 {
149   return 1;
150 }
151
152 int CcsIsRemoteRequest(void)
153 {
154   return CpvAccess(ccsReq)!=NULL;
155 }
156
157 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
158 {
159   *pip = CpvAccess(ccsReq)->attr.ip;
160   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
161 }
162
163 int rep_fw_handler_idx;
164
165 CcsDelayedReply CcsDelayReply(void)
166 {
167   CcsDelayedReply ret;
168   int len = sizeof(CcsImplHeader);
169   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
170     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
171   ret.hdr = (CcsImplHeader*)malloc(len);
172   memcpy(ret.hdr, CpvAccess(ccsReq), len);
173   CpvAccess(ccsReq)=NULL;
174   return ret;
175 }
176
177 void CcsSendReply(int replyLen, const void *replyData)
178 {
179   if (CpvAccess(ccsReq)==NULL)
180     CmiAbort("CcsSendReply: reply already sent!\n");
181   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
182   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
183   CpvAccess(ccsReq) = NULL;
184 }
185
186 void CcsSendReplyNoError(int replyLen, const void *replyData) {
187   if (CpvAccess(ccsReq)==NULL) return;
188   CcsSendReply(replyLen, replyData);
189 }
190
191 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
192 {
193   CcsImplHeader *h = d.hdr;
194   h->len=ChMessageInt_new(1);
195   CcsReply(h,replyLen,replyData);
196   free(h);
197 }
198
199 void CcsNoReply()
200 {
201   if (CpvAccess(ccsReq)==NULL) return;
202   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
203   CcsReply(CpvAccess(ccsReq),0,NULL);
204   CpvAccess(ccsReq) = NULL;
205 }
206
207 void CcsNoDelayedReply(CcsDelayedReply d)
208 {
209   CcsImplHeader *h = d.hdr;
210   h->len = ChMessageInt_new(0);
211   CcsReply(h,0,NULL);
212   free(h);
213 }
214
215
216 /**********************************
217 _CCS Implementation Routines:
218   These do the request forwarding and
219 delivery.
220 ***********************************/
221
222 /*CCS Bottleneck:
223   Deliver the given message data to the given
224 CCS handler.
225 */
226 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
227 {
228   char *cmsg;
229   int reqLen=ChMessageInt(hdr->len);
230 /*Look up handler's converse ID*/
231   char *handlerStr=hdr->handler;
232   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
233   if (fn==NULL) {
234     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
235               hdr->handler);
236     CpvAccess(ccsReq)=hdr;
237     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
238     return;
239  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
240   }
241
242 /* Call the handler */
243   CpvAccess(ccsReq)=hdr;
244 #if CMK_CHARMDEBUG
245   if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
246     /* We are conditionally delivering, the message has been sent to the child, wait for its response */
247     int bytes;
248     if (4==read(conditionalPipe[0], &bytes, 4)) {
249       char *buf = malloc(bytes);
250       read(conditionalPipe[0], buf, bytes);
251       CcsSendReply(bytes,buf);
252       free(buf);
253     } else {
254       /* the pipe has been closed */
255       CpdEndConditionalDeliver_master();
256    }
257   }
258   else
259 #endif
260   {
261     callHandlerRec(fn,reqLen,reqData);
262   
263 /*Check if a reply was sent*/
264     if (CpvAccess(ccsReq)!=NULL)
265       CcsSendReply(0,NULL);/*Send an empty reply if not*/
266   }
267 }
268
269 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
270 /* The followings are necessary to prevent CCS requests to be processed before
271  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
272  * it doesn't hurt having it in the other case as well */
273 static char **bufferedMessages = NULL;
274 static int CcsNumBufferedMsgs = 0;
275 #define CCS_MAX_NUM_BUFFERED_MSGS  100
276
277 void CcsBufferMessage(char *msg) {
278   CmiPrintf("Buffering CCS message\n");
279   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
280   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
281   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
282   bufferedMessages[CcsNumBufferedMsgs] = msg;
283   CcsNumBufferedMsgs ++;
284 }
285 #endif
286   
287 /*Unpacks request message to call above routine*/
288 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
289
290 #if CMK_BLUEGENE_CHARM
291 CpvDeclare(int, _bgCcsHandlerIdx);
292 CpvDeclare(int, _bgCcsAck);
293 /* This routine is needed when the application is built on top of the bigemulator
294  * layer of Charm. In this case, the real CCS handler must be called within a
295  * worker thread. The function of this function is to receive the CCS message in
296  * the bottom converse layer and forward it to the emulated layer. */
297 static void bg_req_fw_handler(char *msg) {
298   if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
299     CcsBufferMessage(msg);
300     return;
301   }
302   //CmiPrintf("CCS scheduling message\n");
303   /* Get out of the message who is the destination pe */
304   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
305   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
306   int destPE = (int)ChMessageInt(hdr->pe);
307   if (destPE == -1) destPE = 0;
308   if (destPE < -1) {
309     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
310     destPE = ChMessageInt(pes_nbo[0]);
311   }
312   //CmiAssert(destPE >= 0); // FixME: should cover also broadcast and multicast -> create generic function to extract destpe
313   (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
314   (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
315   (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
316   (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
317   (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
318   /* Get the right thread to deliver to (for now assume it is using CyclicMapInfo) */
319   addBgNodeInbuffer(msg, destPE/CmiNumPes());
320   //CmiPrintf("message CCS added %d to %d\n",((CmiBlueGeneMsgHeader*)msg)->hID, ((CmiBlueGeneMsgHeader*)msg)->tID);
321 }
322 #define req_fw_handler bg_req_fw_handler
323 #endif
324 extern void req_fw_handler(char *msg);
325
326 void CcsReleaseMessages() {
327 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
328 #if CMK_BLUEGENE_CHARM
329   if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
330 #endif
331   if (CcsNumBufferedMsgs > 0) {
332     int i;
333     //CmiPrintf("CCS: %d messages released\n",CcsNumBufferedMsgs);
334     for (i=0; i<CcsNumBufferedMsgs; ++i) {
335       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
336       CsdEnqueue(bufferedMessages[i]);
337     }
338     free(bufferedMessages);
339     bufferedMessages = NULL;
340     CcsNumBufferedMsgs = -1;
341   }
342 #endif
343 }
344
345 /*Convert CCS header & message data into a converse message 
346  addressed to handler*/
347 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
348 {
349   int reqLen=ChMessageInt(hdr->len);
350   int destPE = ChMessageInt(hdr->pe);
351   int len;
352   char *msg;
353   if (destPE < -1) reqLen -= destPE*sizeof(int);
354   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
355   msg=(char *)CmiAlloc(len);
356   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
357   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
358   if (ret_len!=NULL) *ret_len=len;
359   if (_ccsHandlerIdx != 0) {
360     CmiSetHandler(msg, _ccsHandlerIdx);
361     return msg;
362   } else {
363 #if NODE_0_IS_CONVHOST
364     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
365 #else
366     CcsBufferMessage(msg);
367     return NULL;
368 #endif
369   }
370 }
371
372 /*Receives reply messages passed up from
373 converse to node 0.*/
374 static void rep_fw_handler(char *msg)
375 {
376   int len;
377   char *r=msg+CmiReservedHeaderSize;
378   CcsImplHeader *hdr=(CcsImplHeader *)r; 
379   r+=sizeof(CcsImplHeader);
380   len=ChMessageInt(hdr->len);
381   CcsImpl_reply(hdr,len,r);
382   CmiFree(msg);
383 }
384
385 #if NODE_0_IS_CONVHOST
386 /************** NODE_0_IS_CONVHOST ***********
387 Non net- versions of charm++ are run without a 
388 (real) conv-host program.  This is fine, except 
389 CCS clients connect via conv-host; so for CCS
390 on non-net- versions of charm++, node 0 carries
391 out the CCS forwarding normally done in conv-host.
392
393 CCS works by listening to a TCP connection on a 
394 port-- the Ccs server socket.  A typical communcation
395 pattern is:
396
397 1.) Random program (CCS client) from the net
398 connects to the CCS server socket and sends
399 a CCS request.
400
401 2.) Node 0 forwards the request to the proper
402 PE as a regular converse message (built in CcsImpl_netReq)
403 for CcsHandleRequest.
404
405 3.) CcsHandleRequest looks up the user's pre-registered
406 CCS handler, and passes the user's handler the request data.
407
408 4.) The user's handler calls CcsSendReply with some
409 reply data; OR finishes without calling CcsSendReply,
410 in which case CcsHandleRequest does it.
411
412 5.) CcsSendReply forwards the reply back to node 0,
413 which sends the reply back to the original requestor,
414 on the (still-open) request socket.
415  */
416
417 /**
418 Send a Ccs reply back to the requestor, down the given socket.
419 Since there is no conv-host, node 0 does all the CCS 
420 communication-- this means all requests come to node 0
421 and are forwarded out; all replies are forwarded back to node 0.
422
423 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
424 */
425 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
426 {
427   const int repPE=0;
428   rep->len=ChMessageInt_new(repLen);
429   if (CmiMyPe()==repPE) {
430     /*Actually deliver reply data*/
431     CcsServer_sendReply(rep,repLen,repData);
432   } else {
433     /*Forward data & socket # to the replyPE*/
434     int len=CmiReservedHeaderSize+
435            sizeof(CcsImplHeader)+repLen;
436     char *msg=CmiAlloc(len);
437     char *r=msg+CmiReservedHeaderSize;
438     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
439     memcpy(r,repData,repLen);
440     CmiSetHandler(msg,rep_fw_handler_idx);
441     CmiSyncSendAndFree(repPE,len,msg);
442   }
443 }
444
445 /*No request will be sent through this socket.
446 Closes it.
447 */
448 /*void CcsImpl_noReply(CcsImplHeader *hdr)
449 {
450   int fd=ChMessageInt(hdr->replyFd);
451   skt_close(fd);
452 }*/
453
454 /**
455  * This is the entrance point of a CCS request into the server.
456  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
457  */
458 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
459 {
460   char *msg;
461   int len,repPE=ChMessageInt(hdr->pe);
462   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
463 #if ! CMK_BLUEGENE_CHARM
464     /*Treat out of bound values as errors. Helps detecting bugs*/
465     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
466     else CmiPrintf("Invalid processor index in CCS request.");
467     CpvAccess(ccsReq)=hdr;
468     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
469     return;
470 #endif
471   }
472
473   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
474   if (repPE >= 0) {
475     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
476     //CmiPrintf("CCS message received for %d\n",repPE);
477     CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
478   } else if (repPE == -1) {
479     /* Broadcast to all processors */
480     //CmiPrintf("CCS broadcast received\n");
481     CmiSyncSendAndFree(0,len,msg);
482   } else {
483     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
484     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
485     /* The following %CmiNumPes() follows the assumption that in BigSim the mapping is round-robin */
486     //CmiPrintf("CCS multicast received\n");
487     CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
488   }
489 }
490
491 /*
492 We have to run a CCS server socket here on
493 node 0.  To keep the speed impact minimal,
494 we only probe for new connections (with CcsServerCheck)
495 occasionally.  
496  */
497 #include <signal.h>
498 #include "ccs-server.c" /*Include implementation here in this case*/
499 #include "ccs-auth.c"
500
501 /*Check for ready Ccs messages:*/
502 void CcsServerCheck(void)
503 {
504   while (1==skt_select1(CcsServer_fd(),0)) {
505     CcsImplHeader hdr;
506     void *data;
507     /* printf("Got CCS connect...\n"); */
508     if (CcsServer_recvRequest(&hdr,&data))
509     {/*We got a network request*/
510       /* printf("Got CCS request...\n"); */
511       if (! check_stdio_header(&hdr)) {
512         CcsImpl_netRequest(&hdr,data);
513       }
514       free(data);
515     }
516   }
517 }
518
519 #endif /*NODE_0_IS_CONVHOST*/
520
521 int _isCcsHandlerIdx(int hIdx) {
522   if (hIdx==_ccsHandlerIdx) return 1;
523   if (hIdx==rep_fw_handler_idx) return 1;
524   return 0;
525 }
526
527 void CcsBuiltinsInit(char **argv);
528
529 CpvDeclare(int, cmiArgDebugFlag);
530 CpvDeclare(char *, displayArgument);
531 CpvDeclare(int, cpdSuspendStartup);
532
533 void CcsInit(char **argv)
534 {
535   CpvInitialize(CkHashtable_c, ccsTab);
536   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
537   CpvInitialize(CcsImplHeader *, ccsReq);
538   CpvAccess(ccsReq) = NULL;
539   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
540 #if CMK_BLUEGENE_CHARM
541   CpvInitialize(int, _bgCcsHandlerIdx);
542   CpvAccess(_bgCcsHandlerIdx) = 0;
543   CpvInitialize(int, _bgCcsAck);
544   CpvAccess(_bgCcsAck) = 0;
545 #endif
546   CpvInitialize(int, cmiArgDebugFlag);
547   CpvInitialize(char *, displayArgument);
548   CpvInitialize(int, cpdSuspendStartup);
549   CpvAccess(cmiArgDebugFlag) = 0;
550   CpvAccess(displayArgument) = NULL;
551   CpvAccess(cpdSuspendStartup) = 0;
552   
553   CcsBuiltinsInit(argv);
554
555   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
556 #if NODE_0_IS_CONVHOST
557 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
558   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
559 #endif
560   {
561    int ccs_serverPort=0;
562    char *ccs_serverAuth=NULL;
563    
564    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
565       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
566       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
567     if (CmiMyPe()==0)
568     {/*Create and occasionally poll on a CCS server port*/
569       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
570       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
571     }
572   }
573 #endif
574   /* if in parallel debug mode i.e ++cpd, freeze */
575   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
576   {
577      CpvAccess(cmiArgDebugFlag) = 1;
578      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
579      {
580         if (CpvAccess(displayArgument) == NULL)
581             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
582      }
583      else if (CmiMyPe() == 0)
584      {
585             /* only one processor prints the warning */
586             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
587      }
588
589      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
590        CpvAccess(cpdSuspendStartup) = 1;
591      }
592   }
593
594   CcsReleaseMessages();
595 }
596
597 #endif /*CMK_CCS_AVAILABLE*/
598