4571f4fec4487bf2f47c44572c4fbb101503d5b3
[charm.git] / src / conv-ccs / conv-ccs.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12
13 #include "converse.h"
14 #include "conv-ccs.h"
15 #include "ccs-server.h"
16 #include "sockRoutines.h"
17 #include "queueing.h"
18
19 #if CMK_CCS_AVAILABLE
20
21 /*****************************************************************************
22  *
23  * Converse Client-Server Functions
24  *
25  *****************************************************************************/
26  
27 #include "ckhashtable.h"
28
29 /* Includes all information stored about a single CCS handler. */
30 typedef struct CcsHandlerRec {
31         const char *name; /*Name passed over socket*/
32         CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
33         CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
34         void *userPtr;
35         CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
36         int nCalls; /* Number of times handler has been executed*/
37         CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
38 } CcsHandlerRec;
39
40 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
41   if (strlen(name)>=CCS_MAXHANDLER) 
42         CmiAbort("CCS handler names cannot exceed 32 characters");
43   c->name=strdup(name);
44   c->fn=NULL;
45   c->fnOld=NULL;
46   c->userPtr=NULL;
47   c->mergeFn=NULL;
48   c->nCalls=0;
49 }
50
51 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
52         c->nCalls++;
53         if (c->fnOld) 
54         { /* Backward compatability version:
55             Pack user data into a converse message (cripes! why bother?);
56             user will delete the message. 
57           */
58                 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
59                 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
60                 (c->fnOld)(cmsg);
61         }
62         else { /* Pass read-only copy of data straight to user */
63                 (c->fn)(c->userPtr, reqLen, reqData);
64         }
65 }
66
67 /*Table maps handler name to CcsHandler object*/
68 typedef CkHashtable_c CcsHandlerTable;
69 CpvStaticDeclare(CcsHandlerTable, ccsTab);
70
71 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
72
73 void CcsRegisterHandler(const char *name, CmiHandler fn) {
74   CcsHandlerRec cp;
75   initHandlerRec(&cp,name);
76   cp.fnOld=fn;
77   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
78 }
79 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
80   CcsHandlerRec cp;
81   initHandlerRec(&cp,name);
82   cp.fn=fn;
83   cp.userPtr=ptr;
84   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
85 }
86 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
87   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
88   if (rec==NULL) {
89     CmiAbort("CCS: Unknown CCS handler name.\n");
90   }
91   rec->mergeFn=newMerge;
92   rec->redID=CmiGetGlobalReduction();
93 }
94
95 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
96   CcsImplHeader *hdr;
97   int total = *size;
98   void *reply;
99   char *ptr;
100   int i;
101   for (i=0; i<n; ++i) {
102     hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
103     total += ChMessageInt(hdr->len);
104   }
105   reply = CmiAlloc(total);
106   memcpy(reply, local, *size);
107   ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
108   CmiFree(local);
109   ptr = ((char*)reply)+*size;
110   for (i=0; i<n; ++i) {
111     int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
112     memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
113     ptr += len;
114   }
115   *size = total;
116   return reply;
117 }
118
119 #define SIMPLE_REDUCTION(name, dataType, loop) \
120 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
121   int i, m; \
122   CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
123   int lenLocal = ChMessageInt(hdrLocal->len); \
124   int nElem = lenLocal / sizeof(dataType); \
125   dataType *ret = (dataType *) (hdrLocal+1); \
126   CcsImplHeader *hdr; \
127   for (m=0; m<n; ++m) { \
128     int len; \
129     dataType *value; \
130     hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
131     len = ChMessageInt(hdr->len); \
132     value = (dataType *)(hdr+1); \
133     CmiAssert(lenLocal == len); \
134     for (i=0; i<nElem; ++i) loop; \
135   } \
136   return local; \
137 }
138
139 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
140 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
141 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
142 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
143
144 /*Use this macro for reductions that have the same type for all inputs */
145 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
146   SIMPLE_REDUCTION(nameBase##_int, int, loop) \
147   SIMPLE_REDUCTION(nameBase##_float, float, loop) \
148   SIMPLE_REDUCTION(nameBase##_double, double, loop)
149
150 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
151 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
152 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
153 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
154
155 #undef SIMPLE_REDUCTION
156 #undef SIMPLE_POLYMORPH_REDUCTION
157
158 int CcsEnabled(void)
159 {
160   return 1;
161 }
162
163 int CcsIsRemoteRequest(void)
164 {
165   return CpvAccess(ccsReq)!=NULL;
166 }
167
168 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
169 {
170   *pip = CpvAccess(ccsReq)->attr.ip;
171   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
172 }
173
174 static int rep_fw_handler_idx;
175
176 /**
177  * Decide if the reply is ready to be forwarded to the waiting client,
178  * or if combination is required (for broadcast/multicast CCS requests.
179  */
180 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
181   int repPE = (int)ChMessageInt(rep->pe);
182   if (repPE <= -1) {
183     /* Reduce the message to get the final reply */
184     CcsHandlerRec *fn;
185     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
186     char *msg=CmiAlloc(len);
187     char *r=msg+CmiReservedHeaderSize;
188     char *handlerStr;
189     rep->len = ChMessageInt_new(repLen);
190     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
191     memcpy(r,repData,repLen);
192     CmiSetHandler(msg,rep_fw_handler_idx);
193     handlerStr=rep->handler;
194     fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
195     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
196     if (repPE == -1) {
197       /* CCS Broadcast */
198       CmiReduce(msg, len, fn->mergeFn);
199     } else {
200       /* CCS Multicast */
201       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
202     }
203   } else {
204     CcsImpl_reply(rep, repLen, repData);
205   }
206 }
207
208 CcsDelayedReply CcsDelayReply(void)
209 {
210   CcsDelayedReply ret;
211   int len = sizeof(CcsImplHeader);
212   if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
213     len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
214   ret.hdr = (CcsImplHeader*)malloc(len);
215   memcpy(ret.hdr, CpvAccess(ccsReq), len);
216   CpvAccess(ccsReq)=NULL;
217   return ret;
218 }
219
220 void CcsSendReply(int replyLen, const void *replyData)
221 {
222   if (CpvAccess(ccsReq)==NULL)
223     CmiAbort("CcsSendReply: reply already sent!\n");
224   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
225   CcsReply(CpvAccess(ccsReq),replyLen,replyData);
226   CpvAccess(ccsReq) = NULL;
227 }
228
229 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
230 {
231   CcsImplHeader *h = d.hdr;
232   h->len=ChMessageInt_new(1);
233   CcsReply(h,replyLen,replyData);
234   free(h);
235 }
236
237 void CcsNoReply()
238 {
239   if (CpvAccess(ccsReq)==NULL) return;
240   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
241   CcsReply(CpvAccess(ccsReq),0,NULL);
242   CpvAccess(ccsReq) = NULL;
243 }
244
245 void CcsNoDelayedReply(CcsDelayedReply d)
246 {
247   CcsImplHeader *h = d.hdr;
248   h->len = ChMessageInt_new(0);
249   CcsReply(h,0,NULL);
250   free(h);
251 }
252
253
254 /**********************************
255 _CCS Implementation Routines:
256   These do the request forwarding and
257 delivery.
258 ***********************************/
259
260 /*CCS Bottleneck:
261   Deliver the given message data to the given
262 CCS handler.
263 */
264 static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
265 {
266   char *cmsg;
267   int reqLen=ChMessageInt(hdr->len);
268 /*Look up handler's converse ID*/
269   char *handlerStr=hdr->handler;
270   CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
271   if (fn==NULL) {
272     CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
273               hdr->handler);
274     CpvAccess(ccsReq)=hdr;
275     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
276     return;
277  /*   CmiAbort("CCS: Unknown CCS handler name.\n");*/
278   }
279
280 /* Call the handler */
281   CpvAccess(ccsReq)=hdr;
282   callHandlerRec(fn,reqLen,reqData);
283   
284 /*Check if a reply was sent*/
285   if (CpvAccess(ccsReq)!=NULL)
286     CcsSendReply(0,NULL);/*Send an empty reply if not*/
287 }
288
289 /*Unpacks request message to call above routine*/
290 int _ccsHandlerIdx = 0;/*Converse handler index of below routine*/
291 static void req_fw_handler(char *msg)
292 {
293   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
294   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
295   int destPE = (int)ChMessageInt(hdr->pe);
296   if (CmiMyPe() == 0 && destPE == -1) {
297     /* Broadcast message to all other processors */
298     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
299     CmiSyncBroadcast(len, msg);
300   }
301   else if (destPE < -1) {
302     /* Multicast the message to your children */
303     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
304     int index, child, i;
305     int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
306     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
307     offset -= destPE * sizeof(ChMessageInt_t);
308     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
309       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
310     }
311     for (index=0; index<-destPE; ++index) {
312       if (pes[index] == CmiMyPe()) break;
313     }
314     child = (index << 2) + 1;
315     for (i=0; i<4; ++i) {
316       if (child+i < -destPE) {
317         CmiSyncSend(pes[child+i], len, msg);
318       }
319     }
320   }
321   CcsHandleRequest(hdr, msg+offset);
322   CmiFree(msg);
323 }
324
325 #if ! NODE_0_IS_CONVHOST
326 /* The followings are necessary to prevent CCS requests to be processed before
327  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
328  * it doesn't hurt having it in the other case as well */
329 static char **bufferedMessages = NULL;
330 static int CcsNumBufferedMsgs = 0;
331 #define CCS_MAX_NUM_BUFFERED_MSGS  100
332 #endif
333
334 void CcsBufferMessage(char *msg) {
335   //CmiPrintf("Buffering CCS message\n");
336   CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
337   if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
338   if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
339   bufferedMessages[CcsNumBufferedMsgs] = msg;
340   CcsNumBufferedMsgs ++;
341 }
342   
343 void CcsReleaseMessages() {
344 #if ! NODE_0_IS_CONVHOST
345   if (CcsNumBufferedMsgs > 0) {
346     int i;
347     for (i=0; i<CcsNumBufferedMsgs; ++i) {
348       CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
349       CmiPushPE(0, bufferedMessages[i]);
350     }
351     free(bufferedMessages);
352     bufferedMessages = NULL;
353     CcsNumBufferedMsgs = -1;
354   }
355 #endif
356 }
357
358 /*Convert CCS header & message data into a converse message 
359  addressed to handler*/
360 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
361 {
362   int reqLen=ChMessageInt(hdr->len);
363   int destPE = ChMessageInt(hdr->pe);
364   int len;
365   char *msg;
366   if (destPE < -1) reqLen -= destPE*sizeof(int);
367   len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
368   msg=(char *)CmiAlloc(len);
369   memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
370   memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
371   if (ret_len!=NULL) *ret_len=len;
372   if (_ccsHandlerIdx != 0) {
373     CmiSetHandler(msg, _ccsHandlerIdx);
374     return msg;
375   } else {
376 #if NODE_0_IS_CONVHOST
377     CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
378 #else
379     CcsBufferMessage(msg);
380     return NULL;
381 #endif
382   }
383 }
384
385 /*Receives reply messages passed up from
386 converse to node 0.*/
387 static void rep_fw_handler(char *msg)
388 {
389   int len;
390   char *r=msg+CmiReservedHeaderSize;
391   CcsImplHeader *hdr=(CcsImplHeader *)r; 
392   r+=sizeof(CcsImplHeader);
393   len=ChMessageInt(hdr->len);
394   CcsImpl_reply(hdr,len,r);
395   CmiFree(msg);
396 }
397
398 #if NODE_0_IS_CONVHOST
399 /************** NODE_0_IS_CONVHOST ***********
400 Non net- versions of charm++ are run without a 
401 (real) conv-host program.  This is fine, except 
402 CCS clients connect via conv-host; so for CCS
403 on non-net- versions of charm++, node 0 carries
404 out the CCS forwarding normally done in conv-host.
405
406 CCS works by listening to a TCP connection on a 
407 port-- the Ccs server socket.  A typical communcation
408 pattern is:
409
410 1.) Random program (CCS client) from the net
411 connects to the CCS server socket and sends
412 a CCS request.
413
414 2.) Node 0 forwards the request to the proper
415 PE as a regular converse message (built in CcsImpl_netReq)
416 for CcsHandleRequest.
417
418 3.) CcsHandleRequest looks up the user's pre-registered
419 CCS handler, and passes the user's handler the request data.
420
421 4.) The user's handler calls CcsSendReply with some
422 reply data; OR finishes without calling CcsSendReply,
423 in which case CcsHandleRequest does it.
424
425 5.) CcsSendReply forwards the reply back to node 0,
426 which sends the reply back to the original requestor,
427 on the (still-open) request socket.
428  */
429
430 /**
431 Send a Ccs reply back to the requestor, down the given socket.
432 Since there is no conv-host, node 0 does all the CCS 
433 communication-- this means all requests come to node 0
434 and are forwarded out; all replies are forwarded back to node 0.
435
436 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
437 */
438 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
439 {
440   const int repPE=0;
441   rep->len=ChMessageInt_new(repLen);
442   if (CmiMyPe()==repPE) {
443     /*Actually deliver reply data*/
444     CcsServer_sendReply(rep,repLen,repData);
445   } else {
446     /*Forward data & socket # to the replyPE*/
447     int len=CmiReservedHeaderSize+
448            sizeof(CcsImplHeader)+repLen;
449     char *msg=CmiAlloc(len);
450     char *r=msg+CmiReservedHeaderSize;
451     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
452     memcpy(r,repData,repLen);
453     CmiSetHandler(msg,rep_fw_handler_idx);
454     CmiSyncSendAndFree(repPE,len,msg);
455   }
456 }
457
458 /*No request will be sent through this socket.
459 Closes it.
460 */
461 /*void CcsImpl_noReply(CcsImplHeader *hdr)
462 {
463   int fd=ChMessageInt(hdr->replyFd);
464   skt_close(fd);
465 }*/
466
467 /**
468  * This is the entrance point of a CCS request into the server.
469  * It is executed only on proc 0, and it forwards the request to the appropriate PE.
470  */
471 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
472 {
473   char *msg;
474   int len,repPE=ChMessageInt(hdr->pe);
475   if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
476     /*Treat out of bound values as errors. Helps detecting bugs*/
477     if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
478     else CmiPrintf("Invalid processor index in CCS request.");
479     CpvAccess(ccsReq)=hdr;
480     CcsSendReply(0,NULL); /*Send an empty reply to the possibly waiting client*/
481     return;
482   }
483
484   msg=CcsImpl_ccs2converse(hdr,reqData,&len);
485   if (repPE >= 0) {
486     CmiSyncSendAndFree(repPE,len,msg);
487   } else if (repPE == -1) {
488     /* Broadcast to all processors */
489     CmiPushPE(0, msg);
490   } else {
491     /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
492     int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
493     CmiSyncSendAndFree(firstPE,len,msg);
494   }
495 }
496
497 /*
498 We have to run a CCS server socket here on
499 node 0.  To keep the speed impact minimal,
500 we only probe for new connections (with CcsServerCheck)
501 occasionally.  
502  */
503 #include <signal.h>
504 #include "ccs-server.c" /*Include implementation here in this case*/
505 #include "ccs-auth.c"
506
507 /*Check for ready Ccs messages:*/
508 void CcsServerCheck(void)
509 {
510   while (1==skt_select1(CcsServer_fd(),0)) {
511     CcsImplHeader hdr;
512     void *data;
513     /* printf("Got CCS connect...\n"); */
514     if (CcsServer_recvRequest(&hdr,&data))
515     {/*We got a network request*/
516       /* printf("Got CCS request...\n"); */
517       if (! check_stdio_header(&hdr)) {
518         CcsImpl_netRequest(&hdr,data);
519       }
520       free(data);
521     }
522   }
523 }
524
525 #endif /*NODE_0_IS_CONVHOST*/
526
527 int _isCcsHandlerIdx(int hIdx) {
528   if (hIdx==_ccsHandlerIdx) return 1;
529   if (hIdx==rep_fw_handler_idx) return 1;
530   return 0;
531 }
532
533 void CcsBuiltinsInit(char **argv);
534
535 CpvDeclare(int, cmiArgDebugFlag);
536 CpvDeclare(char *, displayArgument);
537 CpvDeclare(int, cpdSuspendStartup);
538
539 void CcsInit(char **argv)
540 {
541   CpvInitialize(CkHashtable_c, ccsTab);
542   CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
543   CpvInitialize(CcsImplHeader *, ccsReq);
544   CpvAccess(ccsReq) = NULL;
545   _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
546   CpvInitialize(int, cmiArgDebugFlag);
547   CpvInitialize(char *, displayArgument);
548   CpvInitialize(int, cpdSuspendStartup);
549   CpvAccess(cmiArgDebugFlag) = 0;
550   CpvAccess(displayArgument) = NULL;
551   CpvAccess(cpdSuspendStartup) = 0;
552   
553   CcsBuiltinsInit(argv);
554
555   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
556 #if NODE_0_IS_CONVHOST
557 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
558   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
559 #endif
560   {
561    int ccs_serverPort=0;
562    char *ccs_serverAuth=NULL;
563    
564    if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") | 
565       CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
566       CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file")) 
567     if (CmiMyPe()==0)
568     {/*Create and occasionally poll on a CCS server port*/
569       CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
570       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
571     }
572   }
573 #endif
574   /* if in parallel debug mode i.e ++cpd, freeze */
575   if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
576   {
577      CpvAccess(cmiArgDebugFlag) = 1;
578      if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
579      {
580         if (CpvAccess(displayArgument) == NULL)
581             CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
582      }
583      else if (CmiMyPe() == 0)
584      {
585             /* only one processor prints the warning */
586             CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
587      }
588
589      if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
590        CpvAccess(cpdSuspendStartup) = 1;
591      }
592   }
593
594   CcsReleaseMessages();
595 }
596
597 #endif /*CMK_CCS_AVAILABLE*/
598