Removed almost all warnings on origin2000.
[charm.git] / src / conv-core / convcore.c
1 #include <stdio.h>
2 #include <string.h>
3 #include "converse.h"
4 #include "conv-trace.h"
5 #include <errno.h>
6 #if NODE_0_IS_CONVHOST
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netdb.h>
11 #include <sys/time.h>
12 #endif
13
14 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #endif
18
19 #if CMK_TIMER_USE_TIMES
20 #include <sys/times.h>
21 #include <limits.h>
22 #include <unistd.h>
23 #endif
24
25 #if CMK_TIMER_USE_GETRUSAGE
26 #include <sys/time.h>
27 #include <sys/resource.h>
28 #endif
29
30
31 /*****************************************************************************
32  *
33  * Unix Stub Functions
34  *
35  ****************************************************************************/
36
37 #if CMK_STRERROR_USE_SYS_ERRLIST
38 extern char *sys_errlist[];
39 char *strerror(i) int i; { return sys_errlist[i]; }
40 #endif
41
42 #ifdef MEMMONITOR
43 typedef unsigned long mmulong;
44 CpvDeclare(mmulong,MemoryUsage);
45 CpvDeclare(mmulong,HiWaterMark);
46 CpvDeclare(mmulong,ReportedHiWaterMark);
47 CpvDeclare(int,AllocCount);
48 CpvDeclare(int,BlocksAllocated);
49 #endif
50
51 #if CMK_SIGHOLD_USE_SIGMASK
52 #include <signal.h>
53 int sighold(sig) int sig;
54 { if (sigblock(sigmask(sig)) < 0) return -1;
55   else return 0; }
56 int sigrelse(sig) int sig;
57 { if (sigsetmask(sigblock(0)&(~sigmask(sig))) < 0) return -1;
58   else return 0; }
59 #endif
60
61 #define MAX_HANDLERS 512
62
63 #if CMK_NODE_QUEUE_AVAILABLE
64 void  *CmiGetNonLocalNodeQ();
65 #endif
66 void  *CmiGetNonLocal();
67 void   CmiNotifyIdle();
68
69 CpvDeclare(int, disable_sys_msgs);
70 CpvExtern(int,    CcdNumChecks) ;
71 CpvDeclare(void*, CsdSchedQueue);
72 #if CMK_NODE_QUEUE_AVAILABLE
73 CsvDeclare(void*, CsdNodeQueue);
74 CsvDeclare(CmiNodeLock, NodeQueueLock);
75 #endif
76 CpvDeclare(int,   CsdStopFlag);
77
78
79 /*****************************************************************************
80  *
81  * Some of the modules use this in their argument parsing.
82  *
83  *****************************************************************************/
84
85 static char *DeleteArg(argv)
86   char **argv;
87 {
88   char *res = argv[0];
89   if (res==0) { CmiError("Bad arglist."); exit(1); }
90   while (*argv) { argv[0]=argv[1]; argv++; }
91   return res;
92 }
93
94
95 /*****************************************************************************
96  *
97  * Statistics: currently, the following statistics are not updated by converse.
98  *
99  *****************************************************************************/
100
101 CpvDeclare(int, CstatsMaxChareQueueLength);
102 CpvDeclare(int, CstatsMaxForChareQueueLength);
103 CpvDeclare(int, CstatsMaxFixedChareQueueLength);
104 CpvStaticDeclare(int, CstatPrintQueueStatsFlag);
105 CpvStaticDeclare(int, CstatPrintMemStatsFlag);
106
107 void CstatsInit(argv)
108 char **argv;
109 {
110   int argc;
111   char **origArgv = argv;
112
113 #if CMK_WEB_MODE
114   void initUsage();
115 #endif
116
117 #ifdef MEMMONITOR
118   CpvInitialize(mmulong,MemoryUsage);
119   CpvAccess(MemoryUsage) = 0;
120   CpvInitialize(mmulong,HiWaterMark);
121   CpvAccess(HiWaterMark) = 0;
122   CpvInitialize(mmulong,ReportedHiWaterMark);
123   CpvAccess(ReportedHiWaterMark) = 0;
124   CpvInitialize(int,AllocCount);
125   CpvAccess(AllocCount) = 0;
126   CpvInitialize(int,BlocksAllocated);
127   CpvAccess(BlocksAllocated) = 0;
128 #endif
129
130   CpvInitialize(int, CstatsMaxChareQueueLength);
131   CpvInitialize(int, CstatsMaxForChareQueueLength);
132   CpvInitialize(int, CstatsMaxFixedChareQueueLength);
133   CpvInitialize(int, CstatPrintQueueStatsFlag);
134   CpvInitialize(int, CstatPrintMemStatsFlag);
135
136   CpvAccess(CstatsMaxChareQueueLength) = 0;
137   CpvAccess(CstatsMaxForChareQueueLength) = 0;
138   CpvAccess(CstatsMaxFixedChareQueueLength) = 0;
139   CpvAccess(CstatPrintQueueStatsFlag) = 0;
140   CpvAccess(CstatPrintMemStatsFlag) = 0;
141
142   while (*argv) {
143     if (strcmp(*argv, "+mems") == 0) {
144       CpvAccess(CstatPrintMemStatsFlag)=1;
145       DeleteArg(argv);
146     } else
147     if (strcmp(*argv, "+qs") == 0) {
148       CpvAccess(CstatPrintQueueStatsFlag)=1;
149       DeleteArg(argv);
150     } else
151     argv++;
152   }
153
154   argc = 0; argv=origArgv;
155   for(argc=0;argv[argc];argc++);
156 #ifndef CMK_OPTIMIZE
157   traceInit(&argc, argv);
158 #endif
159
160 #if CMK_WEB_MODE
161   initUsage();
162 #endif
163 }
164
165 int CstatMemory(i)
166 int i;
167 {
168   return 0;
169 }
170
171 int CstatPrintQueueStats()
172 {
173   return CpvAccess(CstatPrintQueueStatsFlag);
174 }
175
176 int CstatPrintMemStats()
177 {
178   return CpvAccess(CstatPrintMemStatsFlag);
179 }
180
181 /*****************************************************************************
182  *
183  * Cmi handler registration
184  *
185  *****************************************************************************/
186
187 CpvDeclare(CmiHandler*, CmiHandlerTable);
188 CpvStaticDeclare(int  , CmiHandlerCount);
189 CpvStaticDeclare(int  , CmiHandlerLocal);
190 CpvStaticDeclare(int  , CmiHandlerGlobal);
191 CpvDeclare(int,         CmiHandlerMax);
192
193 void CmiNumberHandler(n, h)
194 int n; CmiHandler h;
195 {
196   CmiHandler *tab;
197   int         max = CpvAccess(CmiHandlerMax);
198
199   tab = CpvAccess(CmiHandlerTable);
200   if (n >= max) {
201     int newmax = ((n<<1)+10);
202     int bytes = max*sizeof(CmiHandler);
203     int newbytes = newmax*sizeof(CmiHandler);
204     CmiHandler *new = (CmiHandler*)CmiAlloc(newbytes);
205     memcpy(new, tab, bytes);
206     memset(((char *)new)+bytes, 0, (newbytes-bytes));
207     free(tab); tab=new;
208     CpvAccess(CmiHandlerTable) = tab;
209     CpvAccess(CmiHandlerMax) = newmax;
210   }
211   tab[n] = h;
212 }
213
214 int CmiRegisterHandler(h)
215 CmiHandler h;
216 {
217   int Count = CpvAccess(CmiHandlerCount);
218   CmiNumberHandler(Count, h);
219   CpvAccess(CmiHandlerCount) = Count+3;
220   return Count;
221 }
222
223 int CmiRegisterHandlerLocal(h)
224 CmiHandler h;
225 {
226   int Local = CpvAccess(CmiHandlerLocal);
227   CmiNumberHandler(Local, h);
228   CpvAccess(CmiHandlerLocal) = Local+3;
229   return Local;
230 }
231
232 int CmiRegisterHandlerGlobal(h)
233 CmiHandler h;
234 {
235   int Global = CpvAccess(CmiHandlerGlobal);
236   if (CmiMyPe()!=0) 
237     CmiError("CmiRegisterHandlerGlobal must only be called on PE 0.\n");
238   CmiNumberHandler(Global, h);
239   CpvAccess(CmiHandlerGlobal) = Global+3;
240   return Global;
241 }
242
243 static void CmiHandlerInit()
244 {
245   CpvInitialize(CmiHandler *, CmiHandlerTable);
246   CpvInitialize(int         , CmiHandlerCount);
247   CpvInitialize(int         , CmiHandlerLocal);
248   CpvInitialize(int         , CmiHandlerGlobal);
249   CpvInitialize(int         , CmiHandlerMax);
250   CpvAccess(CmiHandlerCount)  = 0;
251   CpvAccess(CmiHandlerLocal)  = 1;
252   CpvAccess(CmiHandlerGlobal) = 2;
253   CpvAccess(CmiHandlerMax) = 100;
254   CpvAccess(CmiHandlerTable) = (CmiHandler *)malloc(100*sizeof(CmiHandler)) ;
255 }
256
257
258 /******************************************************************************
259  *
260  * CmiTimer
261  *
262  * Here are two possible implementations of CmiTimer.  Some machines don't
263  * select either, and define the timer in machine.c instead.
264  *
265  *****************************************************************************/
266
267 #if CMK_TIMER_USE_TIMES
268
269 CpvStaticDeclare(double, clocktick);
270 CpvStaticDeclare(int,inittime_wallclock);
271 CpvStaticDeclare(int,inittime_virtual);
272
273 void CmiTimerInit()
274 {
275   struct tms temp;
276   CpvInitialize(double, clocktick);
277   CpvInitialize(int, inittime_wallclock);
278   CpvInitialize(int, inittime_virtual);
279   CpvAccess(inittime_wallclock) = times(&temp);
280   CpvAccess(inittime_virtual) = temp.tms_utime + temp.tms_stime;
281   CpvAccess(clocktick) = 1.0 / (sysconf(_SC_CLK_TCK));
282 }
283
284 double CmiWallTimer()
285 {
286   struct tms temp;
287   double currenttime;
288   int now;
289
290   now = times(&temp);
291   currenttime = (now - CpvAccess(inittime_wallclock)) * CpvAccess(clocktick);
292   return (currenttime);
293 }
294
295 double CmiCpuTimer()
296 {
297   struct tms temp;
298   double currenttime;
299   int now;
300
301   times(&temp);
302   now = temp.tms_stime + temp.tms_utime;
303   currenttime = (now - CpvAccess(inittime_virtual)) * CpvAccess(clocktick);
304   return (currenttime);
305 }
306
307 double CmiTimer()
308 {
309   return CmiCpuTimer();
310 }
311
312 #endif
313
314 #if CMK_TIMER_USE_GETRUSAGE
315
316 CpvStaticDeclare(double, inittime_wallclock);
317 CpvStaticDeclare(double, inittime_virtual);
318
319 void CmiTimerInit()
320 {
321   struct timeval tv;
322   struct rusage ru;
323   CpvInitialize(double, inittime_wallclock);
324   CpvInitialize(double, inittime_virtual);
325   gettimeofday(&tv,0);
326   CpvAccess(inittime_wallclock) = (tv.tv_sec * 1.0) + (tv.tv_usec*0.000001);
327   getrusage(0, &ru); 
328   CpvAccess(inittime_virtual) =
329     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
330     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
331 }
332
333 double CmiCpuTimer()
334 {
335   struct rusage ru;
336   double currenttime;
337
338   getrusage(0, &ru);
339   currenttime =
340     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
341     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
342   return currenttime - CpvAccess(inittime_virtual);
343 }
344
345 double CmiWallTimer()
346 {
347   struct timeval tv;
348   double currenttime;
349
350   gettimeofday(&tv,0);
351   currenttime = (tv.tv_sec * 1.0) + (tv.tv_usec * 0.000001);
352   return currenttime - CpvAccess(inittime_wallclock);
353 }
354
355 double CmiTimer()
356 {
357   return CmiCpuTimer();
358 }
359
360 #endif
361
362 #if CMK_WEB_MODE
363 int appletFd = -1;
364 #endif
365
366 #if NODE_0_IS_CONVHOST
367 int serverFlag = 0;
368 extern int inside_comm;
369 CpvExtern(int, strHandlerID);
370
371 static int hostport, hostskt;
372 static int hostskt_ready_read;
373
374 CpvStaticDeclare(int, CHostHandlerIndex);
375 static unsigned int *nodeIPs;
376 static unsigned int *nodePorts;
377 static int numRegistered;
378
379 static void KillEveryoneCode(int n)
380 {
381   char str[128];
382   sprintf(str, "Fatal Error: code %d\n", n);
383   CmiAbort(str);
384 }
385
386 static void jsleep(int sec, int usec)
387 {
388   int ntimes,i;
389   struct timeval tm;
390
391   ntimes = sec*200 + usec/5000;
392   for(i=0;i<ntimes;i++) {
393     tm.tv_sec = 0;
394     tm.tv_usec = 5000;
395     while(1) {
396       if (select(0,NULL,NULL,NULL,&tm)==0) break;
397       if ((errno!=EBADF)&&(errno!=EINTR)) return;
398     }
399   }
400 }
401
402 void writeall(int fd, char *buf, int size)
403 {
404   int ok;
405   while (size) {
406     retry:
407     ok = write(fd, buf, size);
408     if ((ok<0)&&((errno==EBADF)||(errno==EINTR))) goto retry;
409     if (ok<=0) {
410       CmiAbort("Write failed ..\n");
411     }
412     size-=ok; buf+=ok;
413   }
414 }
415
416 static void skt_server(ppo, pfd)
417 unsigned int *ppo;
418 unsigned int *pfd;
419 {
420   int fd= -1;
421   int ok, len;
422   struct sockaddr_in addr;
423
424 retry:
425   fd = socket(PF_INET, SOCK_STREAM, 0);
426   if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto retry;
427   if (fd < 0) { perror("socket 1"); KillEveryoneCode(93483); }
428   memset(&addr, 0, sizeof(addr));
429   addr.sin_family = AF_INET;
430   ok = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
431   if (ok < 0) { perror("bind"); KillEveryoneCode(22933); }
432   ok = listen(fd,5);
433   if (ok < 0) { perror("listen"); KillEveryoneCode(3948); }
434   len = sizeof(addr);
435   ok = getsockname(fd, (struct sockaddr *)&addr, &len);
436   if (ok < 0) { perror("getsockname"); KillEveryoneCode(93583); }
437
438   *pfd = fd;
439   *ppo = ntohs(addr.sin_port);
440 }
441
442 int skt_connect(ip, port, seconds)
443 unsigned int ip; int port; int seconds;
444 {
445   struct sockaddr_in remote; short sport=port;
446   int fd, ok, begin;
447
448   /* create an address structure for the server */
449   memset(&remote, 0, sizeof(remote));
450   remote.sin_family = AF_INET;
451   remote.sin_port = htons(sport);
452   remote.sin_addr.s_addr = htonl(ip);
453
454   begin = time(0); ok= -1;
455   while (time(0)-begin < seconds) {
456   sock:
457     fd = socket(AF_INET, SOCK_STREAM, 0);
458     if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto sock;
459     if (fd < 0) KillEveryoneCode(234234);
460
461   conn:
462     ok = connect(fd, (struct sockaddr *)&(remote), sizeof(remote));
463     if (ok>=0) break;
464     close(fd);
465     switch (errno) {
466     case EINTR: case EBADF: case EALREADY: case EISCONN: break;
467     case ECONNREFUSED: jsleep(1,0); break;
468     case EADDRINUSE: jsleep(1,0); break;
469     case EADDRNOTAVAIL: jsleep(1,0); break;
470     default: return -1;
471     }
472   }
473   if (ok<0) return -1;
474   return fd;
475 }
476
477 static void skt_accept(src, pip, ppo, pfd)
478 int src;
479 unsigned int *pip;
480 unsigned int *ppo;
481 unsigned int *pfd;
482 {
483   int i, fd;
484   struct sockaddr_in remote;
485   i = sizeof(remote);
486  acc:
487   fd = accept(src, (struct sockaddr *)&remote, &i);
488   if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto acc;
489   if (fd<0) { perror("accept"); KillEveryoneCode(39489); }
490   *pip=htonl(remote.sin_addr.s_addr);
491   *ppo=htons(remote.sin_port);
492   *pfd=fd;
493 }
494
495 static void CheckSocketsReady(void)
496 {
497   static fd_set rfds;
498   static fd_set wfds;
499   struct timeval tmo;
500   int nreadable;
501
502   FD_ZERO(&rfds);
503   FD_ZERO(&wfds);
504   FD_SET(hostskt, &rfds);
505   FD_SET(hostskt, &wfds);
506   tmo.tv_sec = 0;
507   tmo.tv_usec = 0;
508   nreadable = select(FD_SETSIZE, &rfds, &wfds, NULL, &tmo);
509   if (nreadable <= 0) {
510     hostskt_ready_read = 0;
511     return;
512   }
513   hostskt_ready_read = (FD_ISSET(hostskt, &rfds));
514 }
515
516 void CHostRegister(void)
517 {
518   struct hostent *hostent;
519   char hostname[100];
520   int ip;
521   char *msg;
522   int *ptr;
523   int msgSize = CmiMsgHeaderSizeBytes + 3 * sizeof(unsigned int);
524
525   if(gethostname(hostname, 99) < 0) {
526     hostent = gethostent();
527   }
528   else{
529     hostent = gethostbyname(hostname);
530   }
531   if (hostent == 0)
532     ip = 0x7f000001;
533   else
534     ip = htonl(*((CmiInt4 *)(hostent->h_addr_list[0])));
535
536   msg = (char *)CmiAlloc(msgSize * sizeof(char));
537   ptr = (int *)(msg + CmiMsgHeaderSizeBytes);
538   ptr[0] = CmiMyPe();
539   ptr[1] = ip;
540   ptr[2] = hostport;
541   CmiSetHandler(msg, CpvAccess(CHostHandlerIndex));
542   CmiSyncSendAndFree(0, msgSize, msg);
543 }
544
545 unsigned int clientIP, clientPort, clientKillPort;
546
547 void CHostGetOne()
548 {
549   char line[10000];
550   char rest[1000];
551   int ip, port, fd;  FILE *f;
552 #if CMK_WEB_MODE
553   char hndlrId[100];
554   int dont_close = 0;
555   int svrip, svrport;
556 #endif
557
558   skt_accept(hostskt, &ip, &port, &fd);
559   f = fdopen(fd,"r");
560   while (fgets(line, 9999, f)) {
561     if (strncmp(line, "req ", 4)==0) {
562       char cmd[5], *msg;
563       int pe, size, len;
564       int ret;
565 #if CMK_WEB_MODE   
566       sscanf(line, "%s%d%d%d%d%s", cmd, &pe, &size, &svrip, &svrport, hndlrId);
567       if(strcmp(hndlrId, "MonitorHandler") == 0) {
568         appletFd = fd;
569         dont_close = 1;
570       }
571 #else
572       sscanf(line, "%s%d%d", cmd, &pe, &size);
573 #endif
574       /* DEBUGGING */
575       CmiPrintf("Line = %s\n", line);
576
577       sscanf(line, "%s%d%d", cmd, &pe, &size);
578       len = strlen(line);
579       msg = (char *) CmiAlloc(len+size+CmiMsgHeaderSizeBytes+1);
580       if (!msg)
581         CmiPrintf("%d: Out of mem\n", CmiMyPe());
582       CmiSetHandler(msg, CpvAccess(strHandlerID));
583       CmiPrintf("hdlr ID = %d\n", CpvAccess(strHandlerID));
584       strcpy(msg+CmiMsgHeaderSizeBytes, line);
585       ret = fread(msg+CmiMsgHeaderSizeBytes+len, 1, size, f);
586       CmiPrintf("size = %d, ret =%d\n", size, ret);
587       msg[CmiMsgHeaderSizeBytes+len+size] = '\0';
588       CmiSyncSendAndFree(CmiMyPe(), CmiMsgHeaderSizeBytes+len+size+1, msg);
589
590 #if CMK_USE_PERSISTENT_CCS
591       if(dont_close == 1) break;
592 #endif
593
594     }
595     else if (strncmp(line, "getinfo ", 8)==0) {
596       char pre[1024], reply[1024], ans[1024];
597       char cmd[20];
598       int fd;
599       int i;
600       int nscanfread;
601       int nodetab_rank0_size = CmiNumNodes();
602
603       /* DEBUGGING */
604       CmiPrintf("Line = %s\n", line);
605       nscanfread = sscanf(line, "%s%u%u", cmd, &clientIP, &clientPort);
606       if(nscanfread != 3){
607
608         /* DEBUGGING */
609         CmiPrintf("Entering further read...\n");
610
611         fgets(rest, 999, f);
612
613         /* DEBUGGING */
614         CmiPrintf("Rest = %s\n", rest);
615
616         sscanf(rest, "%u%u", &clientIP, &clientPort);
617       }
618       clientIP = (CmiInt4) clientIP;
619       strcpy(pre, "info");
620       reply[0] = 0;
621       sprintf(ans, "%d ", nodetab_rank0_size);
622       strcat(reply, ans);
623       for(i=0;i<nodetab_rank0_size;i++) {
624         strcat(reply, "1 ");
625       }
626       for(i=0;i<nodetab_rank0_size;i++) {
627         sprintf(ans, "%d ", (CmiInt4) nodeIPs[i]);
628         strcat(reply, ans);
629       }
630       for(i=0;i<nodetab_rank0_size;i++) {
631         sprintf(ans, "%d ", nodePorts[i]);
632         strcat(reply, ans);
633       }
634       fd = skt_connect(clientIP, clientPort, 60);
635
636       /** Debugging **/
637       CmiPrintf("After Connect for getinfo reply\n");
638
639
640       if (fd<=0) KillEveryoneCode(2932);
641       write(fd, pre, strlen(pre));
642       write(fd, " ", 1);
643       write(fd, reply, strlen(reply));
644       close(fd);
645     }
646     else if (strncmp(line, "clientdata", strlen("clientdata"))==0){
647       int nread;
648       char cmd[20];
649       
650       nread = sscanf(line, "%s%d", cmd, &clientKillPort);
651       if(nread != 2){
652         fgets(rest, 999, f);
653         
654         /* DEBUGGING */
655         CmiPrintf("Rest = %s\n", rest);
656         
657         sscanf(rest, "%d", &clientKillPort);
658
659         /* Debugging */
660
661         CmiPrintf("After sscanf\n");
662       }
663     }
664     else {
665       CmiPrintf("Request: %s\n", line);
666       KillEveryoneCode(2933);
667     }
668   }
669   CmiPrintf("Out of fgets loop\n");
670 #if CMK_WEB_MODE
671   if(dont_close==0) {
672 #endif
673   fclose(f);
674   close(fd);
675 #if CMK_WEB_MODE
676   }
677 #endif
678 }
679
680 static void CommunicationServer()
681 {
682   if(inside_comm)
683     return;
684     CheckSocketsReady();
685      /*if (hostskt_ready_read) { CHostGetOne(); continue; }*/
686 }
687
688 void CHostHandler(char *msg)
689 {
690   int pe;
691   int *ptr = (int *)(msg + CmiMsgHeaderSizeBytes);
692   pe = ptr[0];
693   nodeIPs[pe] = (unsigned int)(ptr[1]);
694   nodePorts[pe] = (unsigned int)(ptr[2]);
695   numRegistered++;
696  
697   if(numRegistered == CmiNumPes()){
698     if (serverFlag == 1) 
699       CmiPrintf("Server IP = %u, Server port = %u $\n",(CmiInt4) nodeIPs[0], nodePorts[0]);
700   }
701 }
702
703 void CHostInit()
704 {
705   nodeIPs = (unsigned int *)malloc(CmiNumPes() * sizeof(unsigned int));
706   nodePorts = (unsigned int *)malloc(CmiNumPes() * sizeof(unsigned int));
707   CpvInitialize(int, CHostHandlerIndex);
708   CpvAccess(CHostHandlerIndex) = CmiRegisterHandler(CHostHandler);
709 }
710
711 #endif
712 /******************************************************************************
713  *
714  * CmiEnableAsyncIO
715  *
716  * The net and tcp versions use a bunch of unix processes talking to each
717  * other via file descriptors.  We need for a signal SIGIO to be generated
718  * each time a message arrives, making it possible to write a signal
719  * handler to handle the messages.  The vast majority of unixes can,
720  * in fact, do this.  However, there isn't any standard for how this is
721  * supposed to be done, so each version of UNIX has a different set of
722  * calls to turn this signal on.  So, there is like one version here for
723  * every major brand of UNIX.
724  *
725  *****************************************************************************/
726
727 #if CMK_ASYNC_USE_FIOASYNC_AND_FIOSETOWN
728 #include <sys/filio.h>
729 void CmiEnableAsyncIO(fd)
730 int fd;
731 {
732   int pid = getpid();
733   int async = 1;
734   if ( ioctl(fd, FIOSETOWN, &pid) < 0  ) {
735     CmiError("setting socket owner: %s\n", strerror(errno)) ;
736     exit(1);
737   }
738   if ( ioctl(fd, FIOASYNC, &async) < 0 ) {
739     CmiError("setting socket async: %s\n", strerror(errno)) ;
740     exit(1);
741   }
742 }
743 #endif
744
745 #if CMK_ASYNC_USE_FIOASYNC_AND_SIOCSPGRP
746 #include <sys/filio.h>
747 void CmiEnableAsyncIO(fd)
748 int fd;
749 {
750   int pid = -getpid();
751   int async = 1;
752   if ( ioctl(fd, SIOCSPGRP, &pid) < 0  ) {
753     CmiError("setting socket owner: %s\n", strerror(errno)) ;
754     exit(1);
755   }
756   if ( ioctl(fd, FIOASYNC, &async) < 0 ) {
757     CmiError("setting socket async: %s\n", strerror(errno)) ;
758     exit(1);
759   }
760 }
761 #endif
762
763 #if CMK_ASYNC_USE_FIOSSAIOSTAT_AND_FIOSSAIOOWN
764 #include <sys/ioctl.h>
765 void CmiEnableAsyncIO(fd)
766 int fd;
767 {
768   int pid = getpid();
769   int async = 1;
770   if ( ioctl(fd, FIOSSAIOOWN, &pid) < 0  ) {
771     CmiError("setting socket owner: %s\n", strerror(errno)) ;
772     exit(1);
773   }
774   if ( ioctl(fd, FIOSSAIOSTAT, &async) < 0 ) {
775     CmiError("setting socket async: %s\n", strerror(errno)) ;
776     exit(1);
777   }
778 }
779 #endif
780
781 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
782 #include <fcntl.h>
783 void CmiEnableAsyncIO(fd)
784 int fd;
785 {
786   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
787     CmiError("setting socket owner: %s\n", strerror(errno)) ;
788     exit(1);
789   }
790   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
791     CmiError("setting socket async: %s\n", strerror(errno)) ;
792     exit(1);
793   }
794 }
795 #endif
796
797 #if CMK_SIGNAL_USE_SIGACTION
798 #include <signal.h>
799 void CmiSignal(sig1, sig2, sig3, handler)
800 int sig1, sig2, sig3;
801 void (*handler)();
802 {
803   struct sigaction in, out ;
804   in.sa_handler = handler;
805   sigemptyset(&in.sa_mask);
806   if (sig1) sigaddset(&in.sa_mask, sig1);
807   if (sig2) sigaddset(&in.sa_mask, sig2);
808   if (sig3) sigaddset(&in.sa_mask, sig3);
809   in.sa_flags = 0;
810   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
811   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
812   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
813 }
814 #endif
815
816 #if CMK_SIGNAL_USE_SIGACTION_WITH_RESTART
817 #include <signal.h>
818 void CmiSignal(sig1, sig2, sig3, handler)
819 int sig1, sig2, sig3;
820 void (*handler)();
821 {
822   struct sigaction in, out ;
823   in.sa_handler = handler;
824   sigemptyset(&in.sa_mask);
825   if (sig1) sigaddset(&in.sa_mask, sig1);
826   if (sig2) sigaddset(&in.sa_mask, sig2);
827   if (sig3) sigaddset(&in.sa_mask, sig3);
828   in.sa_flags = SA_RESTART;
829   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
830   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
831   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
832 }
833 #endif
834
835 #if CMK_DEBUG_MODE
836
837 CpvDeclare(int, freezeModeFlag);
838 CpvDeclare(int, continueFlag);
839 CpvDeclare(int, stepFlag);
840 CpvDeclare(void *, debugQueue);
841 unsigned int freezeIP;
842 int freezePort;
843 char* breakPointHeader;
844 char* breakPointContents;
845
846 void dummyF()
847 {
848 }
849
850 static void CpdDebugHandler(char *msg)
851 {
852   char *reply, *temp;
853   int index;
854   
855   if(CcsIsRemoteRequest()) {
856     char name[128];
857     unsigned int ip, port;
858     CcsCallerId(&ip, &port);
859     sscanf(msg+CmiMsgHeaderSizeBytes, "%s", name);
860     reply = NULL;
861
862     if (strcmp(name, "freeze") == 0) {
863       CpdFreeze();
864       msgListCleanup();
865       msgListCache();
866       CmiPrintf("freeze received\n");
867     }
868     else if (strcmp(name, "unfreeze") == 0) {
869       CpdUnFreeze();
870       msgListCleanup();
871       CmiPrintf("unfreeze received\n");
872     }
873     else if (strcmp(name, "getObjectList") == 0){
874       CmiPrintf("getObjectList received\n");
875       reply = getObjectList();
876       CmiPrintf("list obtained");
877       if(reply == NULL){
878         CmiPrintf("list empty");
879         CcsSendReply(ip, port, strlen("$") + 1, "$");
880       }
881       else{
882         CmiPrintf("list : %s\n", reply);
883         CcsSendReply(ip, port, strlen(reply) + 1, reply);
884         free(reply);
885       }
886     }
887     else if(strncmp(name,"getObjectContents",strlen("getObjectContents"))==0){
888       CmiPrintf("getObjectContents received\n");
889       temp = strstr(name, "#");
890       temp++;
891       sscanf(temp, "%d", &index);
892       reply = getObjectContents(index);
893       CmiPrintf("Object Contents : %s\n", reply);
894       CcsSendReply(ip, port, strlen(reply) + 1, reply);
895       free(reply);
896     }
897     else if (strcmp(name, "getMsgListSched") == 0){
898       CmiPrintf("getMsgListSched received\n");
899       reply = getMsgListSched();
900       if(reply == NULL)
901         CcsSendReply(ip, port, strlen("$") + 1, "$");
902       else{
903         CcsSendReply(ip, port, strlen(reply) + 1, reply);
904         free(reply);
905       }
906     }
907     else if (strcmp(name, "getMsgListFIFO") == 0){
908       CmiPrintf("getMsgListFIFO received\n");
909       reply = getMsgListFIFO();
910       if(reply == NULL)
911         CcsSendReply(ip, port, strlen("$") + 1, "$");
912       else{
913         CcsSendReply(ip, port, strlen(reply) + 1, reply);
914         free(reply);
915       }
916     }
917     else if (strcmp(name, "getMsgListPCQueue") == 0){
918       CmiPrintf("getMsgListPCQueue received\n");
919       reply = getMsgListPCQueue();
920       if(reply == NULL)
921         CcsSendReply(ip, port, strlen("$") + 1, "$");
922       else{
923         CcsSendReply(ip, port, strlen(reply) + 1, reply);
924         free(reply);
925       }
926     }
927     else if (strcmp(name, "getMsgListDebug") == 0){
928       CmiPrintf("getMsgListDebug received\n");
929       reply = getMsgListDebug();
930       if(reply == NULL)
931         CcsSendReply(ip, port, strlen("$") + 1, "$");
932       else{
933         CcsSendReply(ip, port, strlen(reply) + 1, reply);
934         free(reply);
935       }
936     }
937     else if(strncmp(name,"getMsgContentsSched",strlen("getMsgContentsSched"))==0){
938       CmiPrintf("getMsgContentsSched received\n");
939       temp = strstr(name, "#");
940       temp++;
941       sscanf(temp, "%d", &index);
942       reply = getMsgContentsSched(index);
943       CmiPrintf("Message Contents : %s\n", reply);
944       CcsSendReply(ip, port, strlen(reply) + 1, reply);
945       free(reply);
946     }
947     else if(strncmp(name,"getMsgContentsFIFO",strlen("getMsgContentsFIFO"))==0){
948       CmiPrintf("getMsgContentsFIFO received\n");
949       temp = strstr(name, "#");
950       temp++;
951       sscanf(temp, "%d", &index);
952       reply = getMsgContentsFIFO(index);
953       CmiPrintf("Message Contents : %s\n", reply);
954       CcsSendReply(ip, port, strlen(reply) + 1, reply);
955       free(reply);
956     }
957     else if (strncmp(name, "getMsgContentsPCQueue", strlen("getMsgContentsPCQueue")) == 0){
958       CmiPrintf("getMsgContentsPCQueue received\n");
959       temp = strstr(name, "#");
960       temp++;
961       sscanf(temp, "%d", &index);
962       reply = getMsgContentsPCQueue(index);
963       CmiPrintf("Message Contents : %s\n", reply);
964       CcsSendReply(ip, port, strlen(reply) + 1, reply);
965       free(reply);
966     }
967     else if (strncmp(name, "getMsgContentsDebug", strlen("getMsgContentsDebug")) == 0){
968       CmiPrintf("getMsgContentsDebug received\n");
969       temp = strstr(name, "#");
970       temp++;
971       sscanf(temp, "%d", &index);
972       reply = getMsgContentsDebug(index);
973       CmiPrintf("Message Contents : %s\n", reply);
974       CcsSendReply(ip, port, strlen(reply) + 1, reply);
975       free(reply);
976     } 
977     else if (strncmp(name, "step", strlen("step")) == 0){
978       CmiPrintf("step received\n");
979       CpvAccess(stepFlag) = 1;
980       temp = strstr(name, "#");
981       temp++;
982       sscanf(temp, "%d", &freezePort);
983       freezeIP = ip;
984       CpdUnFreeze();
985     }
986     else if (strncmp(name, "continue", strlen("continue")) == 0){
987       CmiPrintf("continue received\n");
988       CpvAccess(continueFlag) = 1;
989       temp = strstr(name, "#");
990       temp++;
991       sscanf(temp, "%d", &freezePort);
992       freezeIP = ip;
993       CpdUnFreeze();
994     }
995     else if (strcmp(name, "getBreakStepContents") == 0){
996       CmiPrintf("getBreakStepContents received\n");
997       if(breakPointHeader == 0){
998         CcsSendReply(ip, port, strlen("$") + 1, "$");
999       }
1000       else{
1001         reply = (char *)malloc(strlen(breakPointHeader) + strlen(breakPointContents) + 1);
1002         strcpy(reply, breakPointHeader);
1003         strcat(reply, "@");
1004         strcat(reply, breakPointContents);
1005         CcsSendReply(ip, port, strlen(reply) + 1, reply);
1006         free(reply);
1007       }
1008     }
1009     else if (strcmp(name, "getSymbolTableInfo") == 0){
1010       CmiPrintf("getSymbolTableInfo received");
1011       reply = getSymbolTableInfo();
1012       CcsSendReply(ip, port, strlen(reply) + 1, reply);
1013       reply = getBreakPoints();
1014       CcsSendReply(ip, port, strlen(reply) + 1, reply);
1015       free(reply);
1016     }
1017     else if (strncmp(name, "setBreakPoint", strlen("setBreakPoint")) == 0){
1018       CmiPrintf("setBreakPoint received\n");
1019       temp = strstr(name, "#");
1020       temp++;
1021       setBreakPoints(temp);
1022     }
1023     else if (strncmp(name, "gdbRequest", strlen("gdbRequest")) == 0){
1024       CmiPrintf("gdbRequest received\n");
1025       dummyF();
1026     }
1027
1028     else if (strcmp(name, "quit") == 0){
1029       CpdUnFreeze();
1030       CsdExitScheduler();
1031     }
1032     else{
1033       CmiPrintf("incorrect command:%s received,len=%ld\n",name,strlen(name));
1034     }
1035   }
1036 }
1037
1038 void *FIFO_Create(void);
1039
1040 void CpdInit(void)
1041 {
1042   CpvInitialize(int, freezeModeFlag);
1043   CpvAccess(freezeModeFlag) = 0;
1044
1045   CpvInitialize(int, continueFlag);
1046   CpvInitialize(int, stepFlag);
1047   CpvAccess(continueFlag) = 0;
1048   CpvAccess(stepFlag) = 0;
1049
1050   CpvInitialize(void *, debugQueue);
1051   CpvAccess(debugQueue) = FIFO_Create();
1052     
1053   CpdInitializeObjectTable();
1054   CpdInitializeHandlerArray();
1055   CpdInitializeBreakPoints();
1056
1057   CcsRegisterHandler("DebugHandler", CpdDebugHandler);
1058 }  
1059
1060 void CpdFreeze(void)
1061 {
1062   CpvAccess(freezeModeFlag) = 1;
1063 }  
1064
1065 void CpdUnFreeze(void)
1066 {
1067   CpvAccess(freezeModeFlag) = 0;
1068 }  
1069
1070 #endif
1071
1072 #if CMK_WEB_MODE
1073
1074 #define WEB_INTERVAL 2000
1075 #define MAXFNS 20
1076
1077 /* For Web Performance */
1078 typedef int (*CWebFunction)();
1079 unsigned int appletIP;
1080 unsigned int appletPort;
1081 int countMsgs;
1082 char **valueArray;
1083 CWebFunction CWebPerformanceFunctionArray[MAXFNS];
1084 int CWebNoOfFns;
1085 CpvDeclare(int, CWebPerformanceDataCollectionHandlerIndex);
1086 CpvDeclare(int, CWebHandlerIndex);
1087
1088 static void sendDataFunction(void)
1089 {
1090   char *reply;
1091   int len = 0, i;
1092
1093   for(i=0; i<CmiNumPes(); i++){
1094     len += (strlen((char*)(valueArray[i]+
1095                            CmiMsgHeaderSizeBytes+sizeof(int)))+1);
1096     /* for the spaces in between */
1097   }
1098   len+=6; /* for 'perf ' and the \0 at the end */
1099
1100   reply = (char *)malloc(len * sizeof(char));
1101   strcpy(reply, "perf ");
1102
1103   for(i=0; i<CmiNumPes(); i++){
1104     strcat(reply, (valueArray[i] + CmiMsgHeaderSizeBytes + sizeof(int)));
1105     strcat(reply, " ");
1106   }
1107
1108   /* Do the CcsSendReply */
1109 #if CMK_USE_PERSISTENT_CCS
1110   CcsSendReplyFd(appletIP, appletPort, strlen(reply) + 1, reply);
1111 #else
1112   CcsSendReply(appletIP, appletPort, strlen(reply) + 1, reply);
1113 #endif
1114   /* CmiPrintf("Reply = %s\n", reply); */
1115   free(reply);
1116
1117   /* Free valueArray contents */
1118   for(i = 0; i < CmiNumPes(); i++){
1119     CmiFree(valueArray[i]);
1120     valueArray[i] = 0;
1121   }
1122
1123   countMsgs = 0;
1124 }
1125
1126 void CWebPerformanceDataCollectionHandler(char *msg){
1127   int src;
1128   char *prev;
1129
1130   if(CmiMyPe() != 0){
1131     CmiAbort("Wrong processor....\n");
1132   }
1133   src = ((int *)(msg + CmiMsgHeaderSizeBytes))[0];
1134   CmiGrabBuffer((void **)&msg);
1135   prev = valueArray[src]; /* Previous value, ideally 0 */
1136   valueArray[src] = (msg);
1137   if(prev == 0) countMsgs++;
1138   else CmiFree(prev);
1139
1140   if(countMsgs == CmiNumPes()){
1141     sendDataFunction();
1142   }
1143 }
1144
1145 void CWebPerformanceGetData(void *dummy)
1146 {
1147   char *msg, data[100];
1148   int msgSize;
1149   int i;
1150
1151   if(appletIP == 0) {
1152     return;  /* No use if client is not yet connected */
1153   }
1154
1155   strcpy(data, "");
1156   /* Evaluate each of the functions and get the values */
1157   for(i = 0; i < CWebNoOfFns; i++)
1158     sprintf(data, "%s %d", data, (*(CWebPerformanceFunctionArray[i]))());
1159
1160   msgSize = (strlen(data)+1) + sizeof(int) + CmiMsgHeaderSizeBytes;
1161   msg = (char *)CmiAlloc(msgSize);
1162   ((int *)(msg + CmiMsgHeaderSizeBytes))[0] = CmiMyPe();
1163   strcpy(msg + CmiMsgHeaderSizeBytes + sizeof(int), data);
1164   CmiSetHandler(msg, CpvAccess(CWebPerformanceDataCollectionHandlerIndex));
1165   CmiSyncSendAndFree(0, msgSize, msg);
1166
1167   CcdCallFnAfter(CWebPerformanceGetData, 0, WEB_INTERVAL);
1168 }
1169
1170 void CWebPerformanceRegisterFunction(CWebFunction fn)
1171 {
1172   CWebPerformanceFunctionArray[CWebNoOfFns] = fn;
1173   CWebNoOfFns++;
1174 }
1175
1176 static void CWebHandler(char *msg){
1177   int msgSize;
1178   char *getStuffMsg;
1179   int i;
1180
1181   if(CcsIsRemoteRequest()) {
1182     char name[32];
1183     unsigned int ip, port;
1184
1185     CcsCallerId(&ip, &port);
1186     sscanf(msg+CmiMsgHeaderSizeBytes, "%s", name);
1187
1188     if(strcmp(name, "getStuff") == 0){
1189       appletIP = ip;
1190       appletPort = port;
1191
1192       valueArray = (char **)malloc(sizeof(char *) * CmiNumPes());
1193       for(i = 0; i < CmiNumPes(); i++)
1194         valueArray[i] = 0;
1195
1196       for(i = 1; i < CmiNumPes(); i++){
1197         msgSize = CmiMsgHeaderSizeBytes + 2*sizeof(int);
1198         getStuffMsg = (char *)CmiAlloc(msgSize);
1199         ((int *)(getStuffMsg + CmiMsgHeaderSizeBytes))[0] = appletIP;
1200         ((int *)(getStuffMsg + CmiMsgHeaderSizeBytes))[1] = appletPort;
1201         CmiSetHandler(getStuffMsg, CpvAccess(CWebHandlerIndex));
1202         CmiSyncSendAndFree(i, msgSize, getStuffMsg);
1203
1204         CcdCallFnAfter(CWebPerformanceGetData, 0, WEB_INTERVAL);
1205       }
1206     }
1207     else{
1208       CmiPrintf("incorrect command:%s received, len=%ld\n",name,strlen(name));
1209     }
1210   }
1211   else{
1212     /* Ordinary converse message */
1213     appletIP = ((int *)(msg + CmiMsgHeaderSizeBytes))[0];
1214     appletPort = ((int *)(msg + CmiMsgHeaderSizeBytes))[1];
1215
1216     CcdCallFnAfter(CWebPerformanceGetData, 0, WEB_INTERVAL);
1217   }
1218 }
1219
1220 int f2()
1221 {
1222   return(CqsLength(CpvAccess(CsdSchedQueue)));
1223 }
1224
1225 int f3()
1226 {
1227   struct timeval tmo;
1228
1229   gettimeofday(&tmo, NULL);
1230   return(tmo.tv_sec % 10 + CmiMyPe() * 3);
1231 }
1232
1233 /** ADDED 2-14-99 BY MD FOR USAGE TRACKING (TEMPORARY) **/
1234
1235 #define CkUTimer()      ((int)(CmiWallTimer() * 1000000.0))
1236
1237 typedef unsigned int un_int;
1238 CpvDeclare(un_int, startTime);
1239 CpvDeclare(un_int, beginTime);
1240 CpvDeclare(un_int, usedTime);
1241 CpvDeclare(int, PROCESSING);
1242
1243 /* Call this when the program is started
1244  -> Whenever traceModuleInit would be called
1245  -> -> see conv-core/convcore.c
1246 */
1247 void initUsage()
1248 {
1249    CpvInitialize(un_int, startTime);
1250    CpvInitialize(un_int, beginTime);
1251    CpvInitialize(un_int, usedTime);
1252    CpvInitialize(int, PROCESSING);
1253    CpvAccess(beginTime)  = CkUTimer();
1254    CpvAccess(usedTime)   = 0;
1255    CpvAccess(PROCESSING) = 0;
1256 }
1257
1258 int getUsage()
1259 {
1260    int usage = 0;
1261    un_int time      = CkUTimer();
1262    un_int totalTime = time - CpvAccess(beginTime);
1263
1264    if(CpvAccess(PROCESSING))
1265    {
1266       CpvAccess(usedTime) += time - CpvAccess(startTime);
1267       CpvAccess(startTime) = time;
1268    }
1269    if(totalTime > 0)
1270       usage = (100 * CpvAccess(usedTime))/totalTime;
1271    CpvAccess(usedTime)  = 0;
1272    CpvAccess(beginTime) = time;
1273    return usage;
1274 }
1275
1276 /* Call this when a BEGIN_PROCESSING event occurs
1277  -> Whenever a trace_begin_execute or trace_begin_charminit
1278     would be called
1279  -> -> See ck-core/init.c,main.c and conv-core/convcore.c
1280 */
1281 void usageStart()
1282 {
1283    if(CpvAccess(PROCESSING)) return;
1284
1285    CpvAccess(startTime)  = CkUTimer();
1286    CpvAccess(PROCESSING) = 1;
1287 }
1288
1289 /* Call this when an END_PROCESSING event occurs
1290  -> Whenever a trace_end_execute or trace_end_charminit
1291     would be called
1292  -> -> See ck-core/init.c,main.c and conv-core/threads.c
1293 */
1294 void usageStop()
1295 {
1296    if(!CpvAccess(PROCESSING)) return;
1297
1298    CpvAccess(usedTime)   += CkUTimer() - CpvAccess(startTime);
1299    CpvAccess(PROCESSING) = 0;
1300 }
1301
1302 void CWebInit(void)
1303 {
1304   CcsRegisterHandler("MonitorHandler", CWebHandler);
1305
1306   CpvInitialize(int, CWebHandlerIndex);
1307   CpvAccess(CWebHandlerIndex) = CmiRegisterHandler(CWebHandler);
1308
1309   CpvInitialize(int, CWebPerformanceDataCollectionHandlerIndex);
1310   CpvAccess(CWebPerformanceDataCollectionHandlerIndex) =
1311     CmiRegisterHandler(CWebPerformanceDataCollectionHandler);
1312
1313   CWebPerformanceRegisterFunction(getUsage);
1314   CWebPerformanceRegisterFunction(f2);
1315
1316 }
1317
1318 #endif
1319
1320 /*****************************************************************************
1321  *
1322  * The following is the CsdScheduler function.  A common
1323  * implementation is provided below.  The machine layer can provide an
1324  * alternate implementation if it so desires.
1325  *
1326  * void CmiDeliversInit()
1327  *
1328  *      - CmiInit promises to call this before calling CmiDeliverMsgs
1329  *        or any of the other functions in this section.
1330  *
1331  * int CmiDeliverMsgs(int maxmsgs)
1332  *
1333  *      - CmiDeliverMsgs will retrieve up to maxmsgs that were transmitted
1334  *        with the Cmi, and will invoke their handlers.  It does not wait
1335  *        if no message is unavailable.  Instead, it returns the quantity
1336  *        (maxmsgs-delivered), where delivered is the number of messages it
1337  *        delivered.
1338  *
1339  * void CmiDeliverSpecificMsg(int handlerno)
1340  *
1341  *      - Waits for a message with the specified handler to show up, then
1342  *        invokes the message's handler.  Note that unlike CmiDeliverMsgs,
1343  *        This function _does_ wait.
1344  *
1345  * void CmiGrabBuffer(void **bufptrptr)
1346  *
1347  *      - When CmiDeliverMsgs or CmiDeliverSpecificMsgs calls a handler,
1348  *        the handler receives a pointer to a buffer containing the message.
1349  *        The buffer does not belong to the handler, eg, the handler may not
1350  *        free the buffer.  Instead, the buffer will be automatically reused
1351  *        or freed as soon as the handler returns.  If the handler wishes to
1352  *        keep a copy of the data after the handler returns, it may do so by
1353  *        calling CmiGrabBuffer and passing it a pointer to a variable which
1354  *        in turn contains a pointer to the system buffer.  The variable will
1355  *        be updated to contain a pointer to a handler-owned buffer containing
1356  *        the same data as before.  The handler then has the responsibility of
1357  *        making sure the buffer eventually gets freed.  Example:
1358  *
1359  * void myhandler(void *msg)
1360  * {
1361  *    CmiGrabBuffer(&msg);      // Claim ownership of the message buffer
1362  *    ... rest of handler ...
1363  *    CmiFree(msg);             // I have the right to free it or
1364  *                              // keep it, as I wish.
1365  * }
1366  *
1367  *
1368  * For this common implementation to work, the machine layer must provide the
1369  * following:
1370  *
1371  * void *CmiGetNonLocal()
1372  *
1373  *      - returns a message just retrieved from some other PE, not from
1374  *        local.  If no such message exists, returns 0.
1375  *
1376  * CpvExtern(FIFO_Queue, CmiLocalQueue);
1377  *
1378  *      - a FIFO queue containing all messages from the local processor.
1379  *
1380  *****************************************************************************/
1381
1382 CpvDeclare(CmiHandler, CsdNotifyIdle);
1383 CpvDeclare(CmiHandler, CsdNotifyBusy);
1384 CpvDeclare(int, CsdStopNotifyFlag);
1385 CpvStaticDeclare(int, CsdIdleDetectedFlag);
1386
1387 void CsdEndIdle()
1388 {
1389   if(CpvAccess(CsdIdleDetectedFlag)) {
1390     CpvAccess(CsdIdleDetectedFlag) = 0;
1391     if(!CpvAccess(CsdStopNotifyFlag)) {
1392       (CpvAccess(CsdNotifyBusy))();
1393 #ifndef CMK_OPTIMIZE
1394       if(CpvAccess(traceOn))
1395         traceEndIdle();
1396 #endif
1397     }
1398   }
1399 #if CMK_WEB_MODE
1400   usageStart();  
1401 #endif
1402 }
1403
1404 void CsdBeginIdle()
1405 {
1406   if (!CpvAccess(CsdIdleDetectedFlag)) {
1407     CpvAccess(CsdIdleDetectedFlag) = 1;
1408     if(!CpvAccess(CsdStopNotifyFlag)) {
1409       (CpvAccess(CsdNotifyIdle))();
1410 #ifndef CMK_OPTIMIZE
1411       if(CpvAccess(traceOn))
1412         traceBeginIdle();
1413 #endif
1414     }
1415   }
1416 #if CMK_WEB_MODE
1417   usageStop();  
1418 #endif
1419   CmiNotifyIdle();
1420   CcdRaiseCondition(CcdPROCESSORIDLE) ;
1421 }
1422   
1423 #if CMK_CMIDELIVERS_USE_COMMON_CODE
1424
1425 CtvStaticDeclare(int, CmiBufferGrabbed);
1426
1427 void CmiGrabBuffer(void **bufptrptr)
1428 {
1429   CtvAccess(CmiBufferGrabbed) = 1;
1430 }
1431
1432 void CmiReleaseBuffer(void *buffer)
1433 {
1434   CmiGrabBuffer(&buffer);
1435   CmiFree(buffer);
1436 }
1437
1438 void CmiHandleMessage(void *msg)
1439 {
1440 #if CMK_DEBUG_MODE
1441   char *freezeReply;
1442   int fd;
1443
1444   extern int skt_connect(unsigned int, int, int);
1445   extern void writeall(int, char *, int);
1446 #endif
1447
1448   CtvAccess(CmiBufferGrabbed) = 0;
1449
1450 #if CMK_DEBUG_MODE
1451   
1452   if(CpvAccess(continueFlag) && (isBreakPoint((char *)msg))) {
1453
1454     if(breakPointHeader != 0){
1455       free(breakPointHeader);
1456       breakPointHeader = 0;
1457     }
1458     if(breakPointContents != 0){
1459       free(breakPointContents);
1460       breakPointContents = 0;
1461     }
1462     
1463     breakPointHeader = genericViewMsgFunction((char *)msg, 0);
1464     breakPointContents = genericViewMsgFunction((char *)msg, 1);
1465
1466     CmiPrintf("BREAKPOINT REACHED :\n");
1467     CmiPrintf("Header : %s\nContents : %s\n", breakPointHeader, breakPointContents);
1468
1469     /* Freeze and send a message back */
1470     CpdFreeze();
1471     freezeReply = (char *)malloc(strlen("freezing@")+strlen(breakPointHeader)+1);
1472     sprintf(freezeReply, "freezing@%s", breakPointHeader);
1473     fd = skt_connect(freezeIP, freezePort, 120);
1474     if(fd > 0){
1475       writeall(fd, freezeReply, strlen(freezeReply) + 1);
1476       close(fd);
1477     } else {
1478       CmiPrintf("unable to connect");
1479     }
1480     free(freezeReply);
1481     CpvAccess(continueFlag) = 0;
1482   } else if(CpvAccess(stepFlag) && (isEntryPoint((char *)msg))){
1483     if(breakPointHeader != 0){
1484       free(breakPointHeader);
1485       breakPointHeader = 0;
1486     }
1487     if(breakPointContents != 0){
1488       free(breakPointContents);
1489       breakPointContents = 0;
1490     }
1491
1492     breakPointHeader = genericViewMsgFunction((char *)msg, 0);
1493     breakPointContents = genericViewMsgFunction((char *)msg, 1);
1494
1495     CmiPrintf("STEP POINT REACHED :\n");
1496     CmiPrintf("Header:%s\nContents:%s\n",breakPointHeader,breakPointContents);
1497
1498     /* Freeze and send a message back */
1499     CpdFreeze();
1500     freezeReply = (char *)malloc(strlen("freezing@")+strlen(breakPointHeader)+1);
1501     sprintf(freezeReply, "freezing@%s", breakPointHeader);
1502     fd = skt_connect(freezeIP, freezePort, 120);
1503     if(fd > 0){
1504       writeall(fd, freezeReply, strlen(freezeReply) + 1);
1505       close(fd);
1506     } else {
1507       CmiPrintf("unable to connect");
1508     }
1509     free(freezeReply);
1510     CpvAccess(stepFlag) = 0;
1511   }
1512 #endif  
1513   (CmiGetHandlerFunction(msg))(msg);
1514   if (!CtvAccess(CmiBufferGrabbed)) CmiFree(msg);
1515 }
1516
1517 void CmiDeliversInit()
1518 {
1519   CtvInitialize(int, CmiBufferGrabbed);
1520   CtvAccess(CmiBufferGrabbed) = 0;
1521 }
1522
1523 int CmiDeliverMsgs(int maxmsgs)
1524 {
1525   return CsdScheduler(maxmsgs);
1526 }
1527
1528 int CsdScheduler(int maxmsgs)
1529 {
1530   int *msg;
1531   void *localqueue = CpvAccess(CmiLocalQueue);
1532   int cycle = CpvAccess(CsdStopFlag);
1533   
1534 #if CMK_DEBUG_MODE
1535   /* To allow start in freeze state */
1536   msgListCleanup();
1537   msgListCache();
1538 #endif
1539
1540   if(maxmsgs == 0) {
1541     while(1) {
1542 #if NODE_0_IS_CONVHOST
1543       if(hostskt_ready_read) CHostGetOne();
1544 #endif
1545       msg = CmiGetNonLocal();
1546 #if CMK_DEBUG_MODE
1547       if(CpvAccess(freezeModeFlag)==1){
1548
1549         /* Check if the msg is an debug message to let it go
1550            else, enqueue in the FIFO
1551         */
1552
1553         if(msg != 0){
1554           if(strncmp((char *)((char *)msg+CmiMsgHeaderSizeBytes),"req",3)!=0) {
1555             CsdEndIdle();
1556             FIFO_EnQueue(CpvAccess(debugQueue), msg);
1557             continue;
1558           }
1559         }
1560       } else {
1561         /* If the debugQueue contains any messages, process them */
1562         while((!FIFO_Empty(CpvAccess(debugQueue))) && (CpvAccess(freezeModeFlag)==0)){
1563           char *queuedMsg;
1564           FIFO_DeQueue(CpvAccess(debugQueue), &queuedMsg);
1565           CmiHandleMessage(queuedMsg);
1566           maxmsgs--; if (maxmsgs==0) return maxmsgs;
1567         }
1568       }
1569 #endif
1570       if (msg==0) FIFO_DeQueue(localqueue, &msg);
1571 #if CMK_NODE_QUEUE_AVAILABLE
1572       if (msg==0) {
1573         CmiLock(CsvAccess(NodeQueueLock));
1574         msg = CmiGetNonLocalNodeQ();
1575         if (msg==0 && 
1576             !CqsPrioGT(CqsGetPriority(CsvAccess(CsdNodeQueue)), 
1577                        CqsGetPriority(CpvAccess(CsdSchedQueue)))) {
1578           CqsDequeue(CsvAccess(CsdNodeQueue),&msg);
1579         }
1580         CmiUnlock(CsvAccess(NodeQueueLock));
1581       }
1582 #endif
1583       if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
1584       if (msg) {
1585         CmiHandleMessage(msg);
1586         maxmsgs--;
1587         if (CpvAccess(CsdStopFlag) != cycle) return maxmsgs;
1588       } else {
1589         return maxmsgs;
1590       }
1591     }
1592   }
1593
1594   while (1) {
1595 #if NODE_0_IS_CONVHOST
1596     if(hostskt_ready_read) CHostGetOne();
1597 #endif
1598     msg = CmiGetNonLocal();
1599 #if CMK_DEBUG_MODE
1600     if(CpvAccess(freezeModeFlag) == 1){
1601       
1602       /* Check if the msg is an debug message to let it go
1603          else, enqueue in the FIFO 
1604       */
1605
1606       if(msg != 0){
1607         if(strncmp((char *)((char *)msg+CmiMsgHeaderSizeBytes),"req",3)!=0){
1608           CsdEndIdle();
1609           FIFO_EnQueue(CpvAccess(debugQueue), msg);
1610           continue;
1611         }
1612       } 
1613     } else {
1614       /* If the debugQueue contains any messages, process them */
1615       while(((!FIFO_Empty(CpvAccess(debugQueue))) && (CpvAccess(freezeModeFlag)==0))){
1616         char *queuedMsg;
1617         FIFO_DeQueue(CpvAccess(debugQueue), &queuedMsg);
1618         CmiHandleMessage(queuedMsg);
1619         maxmsgs--; if (maxmsgs==0) return maxmsgs;      
1620       }
1621     }
1622 #endif
1623     if (msg==0) FIFO_DeQueue(localqueue, &msg);
1624 #if CMK_NODE_QUEUE_AVAILABLE
1625     if (msg==0) {
1626       CmiLock(CsvAccess(NodeQueueLock));
1627       msg = CmiGetNonLocalNodeQ();
1628       if (msg==0 && 
1629             !CqsPrioGT(CqsGetPriority(CsvAccess(CsdNodeQueue)), 
1630                        CqsGetPriority(CpvAccess(CsdSchedQueue)))) {
1631           CqsDequeue(CsvAccess(CsdNodeQueue),&msg);
1632       }
1633       CmiUnlock(CsvAccess(NodeQueueLock));
1634     }
1635
1636 #endif
1637     if (msg==0) CqsDequeue(CpvAccess(CsdSchedQueue),&msg);
1638     if (msg) {
1639       CsdEndIdle();
1640       CmiHandleMessage(msg);
1641       maxmsgs--; if (maxmsgs==0) return maxmsgs;
1642       if (CpvAccess(CsdStopFlag) != cycle) return maxmsgs;
1643     } else {
1644       CsdBeginIdle();
1645       if (CpvAccess(CsdStopFlag) != cycle) {
1646         CsdEndIdle();
1647         return maxmsgs;
1648       }
1649     }
1650     if (!CpvAccess(disable_sys_msgs))
1651       if (CpvAccess(CcdNumChecks) > 0)
1652         CcdCallBacks();
1653   }
1654 }
1655
1656 void CmiDeliverSpecificMsg(handler)
1657 int handler;
1658 {
1659   int *msg; int side;
1660   void *localqueue = CpvAccess(CmiLocalQueue);
1661  
1662   side = 0;
1663   while (1) {
1664     side ^= 1;
1665     if (side) msg = CmiGetNonLocal();
1666     else      FIFO_DeQueue(localqueue, &msg);
1667     if (msg) {
1668       if (CmiGetHandler(msg)==handler) {
1669         CsdEndIdle();
1670         CmiHandleMessage(msg);
1671         return;
1672       } else {
1673         FIFO_EnQueue(localqueue, msg);
1674       }
1675     }
1676   }
1677 }
1678  
1679 #endif /* CMK_CMIDELIVERS_USE_COMMON_CODE */
1680
1681 /***************************************************************************
1682  *
1683  * Standin Schedulers.
1684  *
1685  * We use the following strategy to make sure somebody's always running
1686  * the scheduler (CsdScheduler).  Initially, we assume the main thread
1687  * is responsible for this.  If the main thread blocks, we create a
1688  * "standin scheduler" thread to replace it.  If the standin scheduler
1689  * blocks, we create another standin scheduler to replace that one,
1690  * ad infinitum.  Collectively, the main thread and all the standin
1691  * schedulers are called "scheduling threads".
1692  *
1693  * Suppose the main thread is blocked waiting for data, and a standin
1694  * scheduler is running instead.  Suppose, then, that the data shows
1695  * up and the main thread is CthAwakened.  This causes a token to be
1696  * pushed into the queue.  When the standin pulls the token from the
1697  * queue and handles it, the standin goes to sleep, and control shifts
1698  * back to the main thread.  In this way, unnecessary standins are put
1699  * back to sleep.  These sleeping standins are stored on the
1700  * CthSleepingStandins list.
1701  *
1702  ***************************************************************************/
1703
1704 CpvStaticDeclare(CthThread, CthMainThread);
1705 CpvStaticDeclare(CthThread, CthSchedulingThread);
1706 CpvStaticDeclare(CthThread, CthSleepingStandins);
1707 CpvStaticDeclare(int      , CthResumeNormalThreadIdx);
1708 CpvStaticDeclare(int      , CthResumeSchedulingThreadIdx);
1709
1710 /** addition for tracing */
1711 CpvDeclare(CthThread, curThread);
1712 /* end addition */
1713
1714 static void CthStandinCode()
1715 {
1716   while (1) CsdScheduler(0);
1717 }
1718
1719 static CthThread CthSuspendNormalThread()
1720 {
1721   return CpvAccess(CthSchedulingThread);
1722 }
1723
1724 static void CthEnqueueSchedulingThread(CthThread t);
1725 static CthThread CthSuspendSchedulingThread();
1726
1727 static CthThread CthSuspendSchedulingThread()
1728 {
1729   CthThread succ = CpvAccess(CthSleepingStandins);
1730   CthThread me = CthSelf();
1731
1732   if (succ) {
1733     CpvAccess(CthSleepingStandins) = CthGetNext(succ);
1734   } else {
1735     succ = CthCreate(CthStandinCode, 0, 256000);
1736     CthSetStrategy(succ,
1737                    CthEnqueueSchedulingThread,
1738                    CthSuspendSchedulingThread);
1739   }
1740   
1741   CpvAccess(CthSchedulingThread) = succ;
1742   return succ;
1743 }
1744
1745 static void CthResumeNormalThread(CthThread t)
1746 {
1747   CmiGrabBuffer((void**)&t);
1748   /** addition for tracing */
1749   CpvAccess(curThread) = t;
1750 #ifndef CMK_OPTIMIZE
1751   if(CpvAccess(traceOn))
1752     traceResume();
1753 #endif
1754   /* end addition */
1755 #if CMK_WEB_MODE
1756   usageStart();  
1757 #endif
1758   CthResume(t);
1759 }
1760
1761 static void CthResumeSchedulingThread(CthThread t)
1762 {
1763   CthThread me = CthSelf();
1764   CmiGrabBuffer((void**)&t);
1765   if (me == CpvAccess(CthMainThread)) {
1766     CthEnqueueSchedulingThread(me);
1767   } else {
1768     CthSetNext(me, CpvAccess(CthSleepingStandins));
1769     CpvAccess(CthSleepingStandins) = me;
1770   }
1771   CpvAccess(CthSchedulingThread) = t;
1772   CthResume(t);
1773 }
1774
1775 static void CthEnqueueNormalThread(CthThread t)
1776 {
1777   CmiSetHandler(t, CpvAccess(CthResumeNormalThreadIdx));
1778   CsdEnqueueFifo(t);
1779 }
1780
1781 static void CthEnqueueSchedulingThread(CthThread t)
1782 {
1783   CmiSetHandler(t, CpvAccess(CthResumeSchedulingThreadIdx));
1784   CsdEnqueueFifo(t);
1785 }
1786
1787 void CthSetStrategyDefault(CthThread t)
1788 {
1789   CthSetStrategy(t,
1790                  CthEnqueueNormalThread,
1791                  CthSuspendNormalThread);
1792 }
1793
1794 void CthSchedInit()
1795 {
1796   CpvInitialize(CthThread, CthMainThread);
1797   CpvInitialize(CthThread, CthSchedulingThread);
1798   CpvInitialize(CthThread, CthSleepingStandins);
1799   CpvInitialize(int      , CthResumeNormalThreadIdx);
1800   CpvInitialize(int      , CthResumeSchedulingThreadIdx);
1801
1802   CpvInitialize(CthThread, curThread);
1803
1804   CpvAccess(CthMainThread) = CthSelf();
1805   CpvAccess(CthSchedulingThread) = CthSelf();
1806   CpvAccess(CthSleepingStandins) = 0;
1807   CpvAccess(CthResumeNormalThreadIdx) =
1808     CmiRegisterHandler(CthResumeNormalThread);
1809   CpvAccess(CthResumeSchedulingThreadIdx) =
1810     CmiRegisterHandler(CthResumeSchedulingThread);
1811   CthSetStrategy(CthSelf(),
1812                  CthEnqueueSchedulingThread,
1813                  CthSuspendSchedulingThread);
1814 }
1815
1816 void CsdInit(argv)
1817   char **argv;
1818 {
1819   void *CqsCreate();
1820
1821   CpvInitialize(int,   disable_sys_msgs);
1822   CpvInitialize(void*, CsdSchedQueue);
1823 #if CMK_NODE_QUEUE_AVAILABLE
1824   CsvInitialize(void*, CsdNodeQueue);
1825   CsvInitialize(CmiNodeLock, NodeQueueLock);
1826 #endif
1827   CpvInitialize(int,   CsdStopFlag);
1828   CpvInitialize(int,   CsdStopNotifyFlag);
1829   CpvInitialize(int,   CsdIdleDetectedFlag);
1830   CpvInitialize(CmiHandler,   CsdNotifyIdle);
1831   CpvInitialize(CmiHandler,   CsdNotifyBusy);
1832   
1833   CpvAccess(disable_sys_msgs) = 0;
1834   CpvAccess(CsdSchedQueue) = CqsCreate();
1835
1836 #if CMK_NODE_QUEUE_AVAILABLE
1837   if (CmiMyRank() ==0) {
1838         CsvAccess(NodeQueueLock) = CmiCreateLock();
1839         CsvAccess(CsdNodeQueue) = CqsCreate();
1840   }
1841   CmiNodeBarrier();
1842 #endif
1843
1844   CpvAccess(CsdStopFlag)  = 0;
1845   CpvAccess(CsdStopNotifyFlag) = 1;
1846   CpvAccess(CsdIdleDetectedFlag) = 0;
1847 }
1848
1849
1850 /*****************************************************************************
1851  *
1852  * Vector Send
1853  *
1854  ****************************************************************************/
1855
1856 #if CMK_VECTOR_SEND_USES_COMMON_CODE
1857
1858 void CmiSyncVectorSend(destPE, n, sizes, msgs)
1859 int destPE, n;
1860 int *sizes;
1861 char **msgs;
1862 {
1863   int i, total;
1864   char *mesg, *tmp;
1865   
1866   for(i=0,total=0;i<n;i++) total += sizes[i];
1867   mesg = (char *) CmiAlloc(total);
1868   for(i=0,tmp=mesg;i<n;i++) {
1869     memcpy(tmp, msgs[i],sizes[i]);
1870     tmp += sizes[i];
1871   }
1872   CmiSyncSendAndFree(destPE, total, mesg);
1873 }
1874
1875 CmiCommHandle CmiAsyncVectorSend(destPE, n, sizes, msgs)
1876 int destPE, n;
1877 int *sizes;
1878 char **msgs;
1879 {
1880   CmiSyncVectorSend(destPE,n,sizes,msgs);
1881   return NULL;
1882 }
1883
1884 void CmiSyncVectorSendAndFree(destPE, n, sizes, msgs)
1885 int destPE, n;
1886 int *sizes;
1887 char **msgs;
1888 {
1889   int i;
1890
1891   CmiSyncVectorSend(destPE,n,sizes,msgs);
1892   for(i=0;i<n;i++) CmiFree(msgs[i]);
1893   CmiFree(sizes);
1894   CmiFree(msgs);
1895 }
1896
1897 #endif
1898
1899 /*****************************************************************************
1900  *
1901  * Multicast groups
1902  *
1903  ****************************************************************************/
1904
1905 #if CMK_MULTICAST_DEF_USE_COMMON_CODE
1906
1907 typedef struct GroupDef
1908 {
1909   union {
1910     char core[CmiMsgHeaderSizeBytes];
1911     struct GroupDef *next;
1912   } core;
1913   CmiGroup group;
1914   int npes;
1915   int pes[1];
1916 }
1917 *GroupDef;
1918
1919 #define GROUPTAB_SIZE 101
1920
1921 CpvStaticDeclare(int, CmiGroupHandlerIndex);
1922 CpvStaticDeclare(int, CmiGroupCounter);
1923 CpvStaticDeclare(GroupDef *, CmiGroupTable);
1924
1925 void CmiGroupHandler(GroupDef def)
1926 {
1927   /* receive group definition, insert into group table */
1928   GroupDef *table = CpvAccess(CmiGroupTable);
1929   unsigned int hashval, bucket;
1930   CmiGrabBuffer((void*)&def);
1931   hashval = (def->group.id ^ def->group.pe);
1932   bucket = hashval % GROUPTAB_SIZE;
1933   def->core.next = table[bucket];
1934   table[bucket] = def;
1935 }
1936
1937 CmiGroup CmiEstablishGroup(int npes, int *pes)
1938 {
1939   /* build new group definition, broadcast it */
1940   CmiGroup grp; GroupDef def; int len, i;
1941   grp.id = CpvAccess(CmiGroupCounter)++;
1942   grp.pe = CmiMyPe();
1943   len = sizeof(struct GroupDef)+(npes*sizeof(int));
1944   def = (GroupDef)CmiAlloc(len);
1945   def->group = grp;
1946   def->npes = npes;
1947   for (i=0; i<npes; i++)
1948     def->pes[i] = pes[i];
1949   CmiSetHandler(def, CpvAccess(CmiGroupHandlerIndex));
1950   CmiSyncBroadcastAllAndFree(len, def);
1951   return grp;
1952 }
1953
1954 void CmiLookupGroup(CmiGroup grp, int *npes, int **pes)
1955 {
1956   unsigned int hashval, bucket;  GroupDef def;
1957   GroupDef *table = CpvAccess(CmiGroupTable);
1958   hashval = (grp.id ^ grp.pe);
1959   bucket = hashval % GROUPTAB_SIZE;
1960   for (def=table[bucket]; def; def=def->core.next) {
1961     if ((def->group.id == grp.id)&&(def->group.pe == grp.pe)) {
1962       *npes = def->npes;
1963       *pes = def->pes;
1964       return;
1965     }
1966   }
1967   *npes = 0; *pes = 0;
1968 }
1969
1970 void CmiGroupInit()
1971 {
1972   CpvInitialize(int, CmiGroupHandlerIndex);
1973   CpvInitialize(int, CmiGroupCounter);
1974   CpvInitialize(GroupDef *, CmiGroupTable);
1975   CpvAccess(CmiGroupHandlerIndex) = CmiRegisterHandler(CmiGroupHandler);
1976   CpvAccess(CmiGroupCounter) = 0;
1977   CpvAccess(CmiGroupTable) =
1978     (GroupDef*)calloc(GROUPTAB_SIZE, sizeof(GroupDef));
1979   if (CpvAccess(CmiGroupTable) == 0)
1980     CmiAbort("Memory Allocation Error");
1981 }
1982
1983 #endif
1984
1985 /*****************************************************************************
1986  *
1987  * Common List-Cast and Multicast Code
1988  *
1989  ****************************************************************************/
1990
1991 #if CMK_MULTICAST_LIST_USE_COMMON_CODE
1992
1993 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
1994 {
1995   CmiError("ListSend not implemented.");
1996 }
1997
1998 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
1999 {
2000   CmiError("ListSend not implemented.");
2001   return (CmiCommHandle) 0;
2002 }
2003
2004 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2005 {
2006   CmiError("ListSend not implemented.");
2007 }
2008
2009 #endif
2010
2011 #if CMK_MULTICAST_GROUP_USE_COMMON_CODE
2012
2013 typedef struct MultiMsg
2014 {
2015   char core[CmiMsgHeaderSizeBytes];
2016   CmiGroup group;
2017   int pos;
2018   int origlen;
2019 }
2020 *MultiMsg;
2021
2022 CpvDeclare(int, CmiMulticastHandlerIndex);
2023
2024 void CmiMulticastDeliver(MultiMsg msg)
2025 {
2026   int npes, *pes; int olen, nlen, pos, child1, child2;
2027   olen = msg->origlen;
2028   nlen = olen + sizeof(struct MultiMsg);
2029   CmiLookupGroup(msg->group, &npes, &pes);
2030   if (pes==0) {
2031     CmiSyncSendAndFree(CmiMyPe(), nlen, msg);
2032     return;
2033   }
2034   if (npes==0) {
2035     CmiFree(msg);
2036     return;
2037   }
2038   if (msg->pos == -1) {
2039     msg->pos=0;
2040     CmiSyncSendAndFree(pes[0], nlen, msg);
2041     return;
2042   }
2043   pos = msg->pos;
2044   child1 = ((pos+1)<<1);
2045   child2 = child1-1;
2046   if (child1 < npes) {
2047     msg->pos = child1;
2048     CmiSyncSend(pes[child1], nlen, msg);
2049   }
2050   if (child2 < npes) {
2051     msg->pos = child2;
2052     CmiSyncSend(pes[child2], nlen, msg);
2053   }
2054   if(olen < sizeof(struct MultiMsg)) {
2055     memcpy(msg, msg+1, olen);
2056   } else {
2057     memcpy(msg, (((char*)msg)+olen), sizeof(struct MultiMsg));
2058   }
2059   CmiSyncSendAndFree(CmiMyPe(), olen, msg);
2060 }
2061
2062 void CmiMulticastHandler(MultiMsg msg)
2063 {
2064   CmiGrabBuffer((void*)&msg);
2065   CmiMulticastDeliver(msg);
2066 }
2067
2068 void CmiSyncMulticastFn(CmiGroup grp, int len, char *msg)
2069 {
2070   int newlen; MultiMsg newmsg;
2071   newlen = len + sizeof(struct MultiMsg);
2072   newmsg = (MultiMsg)CmiAlloc(newlen);
2073   if(len < sizeof(struct MultiMsg)) {
2074     memcpy(newmsg+1, msg, len);
2075   } else {
2076     memcpy(newmsg+1, msg+sizeof(struct MultiMsg), len-sizeof(struct MultiMsg));
2077     memcpy(((char *)newmsg+len), msg, sizeof(struct MultiMsg));
2078   }
2079   newmsg->group = grp;
2080   newmsg->origlen = len;
2081   newmsg->pos = -1;
2082   CmiSetHandler(newmsg, CpvAccess(CmiMulticastHandlerIndex));
2083   CmiMulticastDeliver(newmsg);
2084 }
2085
2086 void CmiFreeMulticastFn(CmiGroup grp, int len, char *msg)
2087 {
2088   CmiSyncMulticastFn(grp, len, msg);
2089   CmiFree(msg);
2090 }
2091
2092 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int len, char *msg)
2093 {
2094   CmiError("Async Multicast not implemented.");
2095   return (CmiCommHandle) 0;
2096 }
2097
2098 void CmiMulticastInit()
2099 {
2100   CpvInitialize(int, CmiMulticastHandlerIndex);
2101   CpvAccess(CmiMulticastHandlerIndex) =
2102     CmiRegisterHandler(CmiMulticastHandler);
2103 }
2104
2105 #endif
2106
2107 /***************************************************************************
2108  *
2109  * Memory Allocation routines 
2110  *
2111  * A block of memory can consist of multiple chunks.  Each chunk has
2112  * a sizefield and a refcount.  The first chunk's refcount is a reference
2113  * count.  That's how many CmiFrees it takes to free the message.
2114  * Subsequent chunks have a refcount which is less than zero.  This is
2115  * the offset back to the start of the first chunk.
2116  *
2117  ***************************************************************************/
2118
2119 #define SIZEFIELD(m) ((int *)((char *)(m)-2*sizeof(int)))[0]
2120 #define REFFIELD(m) ((int *)((char *)(m)-sizeof(int)))[0]
2121 #define BLKSTART(m) ((char *)m-2*sizeof(int))
2122
2123 void *CmiAlloc(size)
2124 int size;
2125 {
2126   char *res;
2127   res =(char *)malloc(size+2*sizeof(int));
2128   if (res==0) CmiAbort("Memory allocation failed.");
2129
2130 #ifdef MEMMONITOR
2131   CpvAccess(MemoryUsage) += size+2*sizeof(int);
2132   CpvAccess(AllocCount)++;
2133   CpvAccess(BlocksAllocated)++;
2134   if (CpvAccess(MemoryUsage) > CpvAccess(HiWaterMark)) {
2135     CpvAccess(HiWaterMark) = CpvAccess(MemoryUsage);
2136   }
2137   if (CpvAccess(MemoryUsage) > 1.1 * CpvAccess(ReportedHiWaterMark)) {
2138     CmiPrintf("HIMEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2139             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2140             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2141     CpvAccess(ReportedHiWaterMark) = CpvAccess(MemoryUsage);
2142   }
2143   if ((CpvAccess(AllocCount) % 1000) == 0) {
2144     CmiPrintf("MEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2145             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2146             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2147   }
2148 #endif
2149
2150   ((int *)res)[0]=size;
2151   ((int *)res)[1]=1;
2152   return (void *)(res+2*sizeof(int));
2153 }
2154
2155 void CmiReference(blk)
2156 void *blk;
2157 {
2158   int refCount = REFFIELD(blk);
2159   if (refCount < 0) {
2160     blk = (void *)((char*)blk+refCount);
2161     refCount = REFFIELD(blk);
2162   }
2163   REFFIELD(blk) = refCount+1;
2164 }
2165
2166 int CmiSize(blk)
2167 void *blk;
2168 {
2169   return SIZEFIELD(blk);
2170 }
2171
2172 void CmiFree(blk)
2173 void *blk;
2174 {
2175   int refCount;
2176
2177   refCount = REFFIELD(blk);
2178   if (refCount < 0) {
2179     blk = (void *)((char*)blk+refCount);
2180     refCount = REFFIELD(blk);
2181   }
2182   if(refCount==0) {
2183 #ifdef MEMMONITOR
2184     if (SIZEFIELD(blk) > 100000)
2185       CmiPrintf("MEMSTAT Uh-oh -- SIZEFIELD=%d\n",SIZEFIELD(blk));
2186     CpvAccess(MemoryUsage) -= (SIZEFIELD(blk) + 2*sizeof(int));
2187     CpvAccess(BlocksAllocated)--;
2188     CmiPrintf("Refcount 0 case called\n");
2189 #endif
2190     free(BLKSTART(blk));
2191     return;
2192   }
2193   refCount--;
2194   if(refCount==0) {
2195 #ifdef MEMMONITOR
2196     if (SIZEFIELD(blk) > 100000)
2197       CmiPrintf("MEMSTAT Uh-oh -- SIZEFIELD=%d\n",SIZEFIELD(blk));
2198     CpvAccess(MemoryUsage) -= (SIZEFIELD(blk) + 2*sizeof(int));
2199     CpvAccess(BlocksAllocated)--;
2200 #endif
2201     free(BLKSTART(blk));
2202     return;
2203   }
2204   REFFIELD(blk) = refCount;
2205 }
2206
2207 /******************************************************************************
2208
2209   Multiple Send function                               
2210
2211   ****************************************************************************/
2212
2213 CpvDeclare(int, CmiMainHandlerIDP); /* Main handler that is run on every node */
2214
2215 /****************************************************************************
2216 * DESCRIPTION : This function call allows the user to send multiple messages
2217 *               from one processor to another, all intended for differnet 
2218 *               handlers.
2219 *
2220 *               Parameters :
2221 *
2222 *               destPE, len, int sizes[], char *messages[]
2223 *
2224 * ASSUMPTION  : The sizes[] and the messages[] array begin their indexing FROM 1.
2225 *               (i.e They should have memory allocated for n + 1)
2226 *               This is important to ensure that the call works correctly
2227 *
2228 ****************************************************************************/
2229
2230 void CmiMultipleSend(unsigned int destPE, int len, int sizes[], char *msgComps[])
2231 {
2232   char *header;
2233   int i;
2234   int *newSizes;
2235   char **newMsgComps;
2236   int mask = ~7; /* to mask off the last 3 bits */
2237   char *pad = "                 "; /* padding required - 16 bytes long w.case */
2238
2239   /* Allocate memory for the newSizes array and the newMsgComps array*/
2240   newSizes = (int *)CmiAlloc(2 * (len + 1) * sizeof(int));
2241   newMsgComps = (char **)CmiAlloc(2 * (len + 1) * sizeof(char *));
2242
2243   /* Construct the newSizes array from the old sizes array */
2244   newSizes[0] = (CmiMsgHeaderSizeBytes + (len + 1)*sizeof(int));
2245   newSizes[1] = ((CmiMsgHeaderSizeBytes + (len + 1)*sizeof(int) + 7)&mask) - newSizes[0] + 2*sizeof(int);
2246                      /* To allow the extra 8 bytes for the CmiSize & the Ref Count */
2247
2248   for(i = 1; i < len + 1; i++){
2249     newSizes[2*i] = (sizes[i - 1]);
2250     newSizes[2*i + 1] = ((sizes[i -1] + 7)&mask) - newSizes[2*i] + 2*sizeof(int); 
2251              /* To allow the extra 8 bytes for the CmiSize & the Ref Count */
2252   }
2253     
2254   header = (char *)CmiAlloc(newSizes[0]*sizeof(char));
2255
2256   /* Set the len field in the buffer */
2257   *(int *)(header + CmiMsgHeaderSizeBytes) = len;
2258
2259   /* and the induvidual lengths */
2260   for(i = 1; i < len + 1; i++){
2261     *((int *)(header + CmiMsgHeaderSizeBytes) + i) = newSizes[2*i] + newSizes[2*i + 1];
2262   }
2263
2264   /* This message shd be recd by the main handler */
2265   CmiSetHandler(header, CpvAccess(CmiMainHandlerIDP));
2266   newMsgComps[0] = header;
2267   newMsgComps[1] = pad;
2268
2269   for(i = 1; i < (len + 1); i++){
2270     newMsgComps[2*i] =  msgComps[i - 1];
2271     newMsgComps[2*i + 1] = pad;
2272   }
2273
2274   CmiSyncVectorSend(destPE, 2*(len + 1), newSizes, newMsgComps);
2275   CmiFree(newSizes);
2276   CmiFree(newMsgComps);
2277   CmiFree(header);
2278 }
2279
2280 /****************************************************************************
2281 * DESCRIPTION : This function initializes the main handler required for the
2282 *               CmiMultipleSendP() function to work. 
2283 *               
2284 *               This function should be called once in any Converse program
2285 *               that uses CmiMultipleSendP()
2286 *
2287 ****************************************************************************/
2288
2289 static void CmiMultiMsgHandler(char *msgWhole);
2290
2291 void CmiInitMultipleSend(void)
2292 {
2293   CpvInitialize(int,CmiMainHandlerIDP); 
2294   CpvAccess(CmiMainHandlerIDP) =
2295     CmiRegisterHandler((CmiHandler)CmiMultiMsgHandler);
2296 }
2297
2298 /****************************************************************************
2299 * DESCRIPTION : This function is the main handler required for the
2300 *               CmiMultipleSendP() function to work. 
2301 *
2302 ****************************************************************************/
2303
2304 static void memChop(char *msgWhole);
2305
2306 static void CmiMultiMsgHandler(char *msgWhole)
2307 {
2308   int len;
2309   int *sizes;
2310   int i;
2311   int offset;
2312   int mask = ~7; /* to mask off the last 3 bits */
2313   
2314   /* Number of messages */
2315   offset = CmiMsgHeaderSizeBytes;
2316   len = *(int *)(msgWhole + offset);
2317   offset += sizeof(int);
2318
2319   /* Allocate array to store sizes */
2320   sizes = (int *)(msgWhole + offset);
2321   offset += sizeof(int)*len;
2322
2323   /* This is needed since the header may or may not be aligned on an 8 bit boundary */
2324   offset = (offset + 7)&mask;
2325
2326   /* To cross the 8 bytes inserted in between */
2327   offset += 2*sizeof(int);
2328
2329   /* Call memChop() */
2330   memChop(msgWhole);
2331
2332   /* Send the messages to their respective handlers (on the same machine) */
2333   /* Currently uses CmiSyncSend(), later modify to use Scheduler enqueuing */
2334   for(i = 0; i < len; i++){
2335     CmiSyncSendAndFree(CmiMyPe(), sizes[i], ((char *)(msgWhole + offset))); 
2336     offset += sizes[i];
2337   }
2338 }
2339
2340 static void memChop(char *msgWhole)
2341 {
2342   int len;
2343   int *sizes;
2344   int i;
2345   int offset;
2346   int mask = ~7; /* to mask off the last 3 bits */
2347   
2348   /* Number of messages */
2349   offset = CmiMsgHeaderSizeBytes;
2350   len = *(int *)(msgWhole + offset);
2351   offset += sizeof(int);
2352
2353   /* Set Reference count in the CmiAlloc header*/
2354   /* Reference Count includes the header also, hence (len + 1) */
2355   ((int *)(msgWhole - sizeof(int)))[0] = len + 1;
2356
2357   /* Allocate array to store sizes */
2358   sizes = (int *)(msgWhole + offset);
2359   offset += sizeof(int)*len;
2360
2361   /* This is needed since the header may or may not be aligned on an 8 bit boundary */
2362   offset = (offset + 7)&mask;
2363
2364   /* To cross the 8 bytes inserted in between */
2365   offset += 2*sizeof(int);
2366
2367   /* update the sizes and offsets for all the chunks */
2368   for(i = 0; i < len; i++){
2369     /* put in the size value for that part */
2370     ((int *)(msgWhole + offset - 2*sizeof(int)))[0] = sizes[i] - 2*sizeof(int);
2371     
2372     /* now put in the offset (a negative value) to get right back to the begining */
2373     ((int *)(msgWhole + offset - sizeof(int)))[0] = (-1)*offset;
2374     
2375     offset += sizes[i];
2376   }
2377 }
2378
2379 /*****************************************************************************
2380  *
2381  * Converse Client-Server Functions
2382  *
2383  *****************************************************************************/
2384
2385 #if CMK_CCS_AVAILABLE
2386
2387 typedef struct CcsListNode {
2388   char name[32];
2389   int hdlr;
2390   struct CcsListNode *next;
2391 }CcsListNode;
2392
2393 CpvStaticDeclare(CcsListNode*, ccsList);
2394 CpvStaticDeclare(int, callerIP);
2395 CpvStaticDeclare(int, callerPort);
2396 CpvDeclare(int, strHandlerID);
2397
2398 static void CcsStringHandlerFn(char *msg)
2399 {
2400   char cmd[10], hdlrName[32], *cmsg, *omsg=msg;
2401   int ip, port, pe, size, nread, hdlrID;
2402   CcsListNode *list = CpvAccess(ccsList);
2403
2404   msg += CmiMsgHeaderSizeBytes;
2405   nread = sscanf(msg, "%s%d%d%d%d%s", 
2406                  cmd, &pe, &size, &ip, &port, hdlrName);
2407   if(nread!=6) CmiAbort("Garbled message from client");
2408   CmiPrintf("message for %s\n", hdlrName);
2409   while(list!=0) {
2410     if(strcmp(hdlrName, list->name)==0) {
2411       hdlrID = list->hdlr;
2412       break;
2413     }
2414     list = list->next;
2415   }
2416   if(list==0) CmiAbort("Invalid Service Request\n");
2417   while(*msg != '\n') msg++;
2418   msg++;
2419   cmsg = (char *) CmiAlloc(size+CmiMsgHeaderSizeBytes+1);
2420   memcpy(cmsg+CmiMsgHeaderSizeBytes, msg, size);
2421   cmsg[CmiMsgHeaderSizeBytes+size] = '\0';
2422
2423   CmiSetHandler(cmsg, hdlrID);
2424   CpvAccess(callerIP) = ip;
2425   CpvAccess(callerPort) = port;
2426   CmiHandleMessage(cmsg);
2427   CmiGrabBuffer((void **)&omsg);
2428   CmiFree(omsg);
2429   CpvAccess(callerIP) = 0;
2430 }
2431
2432 static void CcsInit(void)
2433 {
2434   CpvInitialize(CcsListNode*, ccsList);
2435   CpvAccess(ccsList) = 0;
2436   CpvInitialize(int, callerIP);
2437   CpvAccess(callerIP) = 0;
2438   CpvInitialize(int, callerPort);
2439   CpvAccess(callerPort) = 0;
2440   CpvInitialize(int, strHandlerID);
2441   CpvAccess(strHandlerID) = CmiRegisterHandler(CcsStringHandlerFn);
2442 }
2443
2444 void CcsUseHandler(char *name, int hdlr)
2445 {
2446   CcsListNode *list=CpvAccess(ccsList);
2447   if(list==0) {
2448     list = (CcsListNode *)malloc(sizeof(CcsListNode));
2449     CpvAccess(ccsList) = list;
2450   } else {
2451     while(list->next != 0) 
2452       list = list->next;
2453     list->next = (CcsListNode *)malloc(sizeof(CcsListNode));
2454     list = list->next;
2455   }
2456   strcpy(list->name, name);
2457   list->hdlr = hdlr;
2458   list->next = 0;
2459 }
2460
2461 int CcsRegisterHandler(char *name, CmiHandler fn)
2462 {
2463   int hdlr = CmiRegisterHandlerLocal(fn);
2464   CcsUseHandler(name, hdlr);
2465   return hdlr;
2466 }
2467
2468 int CcsEnabled(void)
2469 {
2470   return 1;
2471 }
2472
2473 int CcsIsRemoteRequest(void)
2474 {
2475   return (CpvAccess(callerIP) != 0);
2476 }
2477
2478 void CcsCallerId(unsigned int *pip, unsigned int *pport)
2479 {
2480   *pip = CpvAccess(callerIP);
2481   *pport = CpvAccess(callerPort);
2482 }
2483
2484 extern int skt_connect(unsigned int, int, int);
2485 extern void writeall(int, char *, int);
2486
2487 void CcsSendReply(unsigned int ip, unsigned int port, int size, void *msg)
2488 {
2489   char cmd[100];
2490   int fd;
2491
2492   fd = skt_connect(ip, port, 120);
2493   
2494   if (fd<0) {
2495       CmiPrintf("client Exited\n");
2496       return; /* maybe the requester exited */
2497   }
2498   sprintf(cmd, "reply %10d\n", size);
2499   writeall(fd, cmd, strlen(cmd));
2500   writeall(fd, msg, size);
2501
2502 #if CMK_SYNCHRONIZE_ON_TCP_CLOSE
2503   shutdown(fd, 1);
2504   while (read(fd, &c, 1)==EINTR);
2505   close(fd);
2506 #else
2507   close(fd);
2508 #endif
2509 }
2510
2511 #if CMK_USE_PERSISTENT_CCS
2512 void CcsSendReplyFd(unsigned int ip, unsigned int port, int size, void *msg)
2513 {
2514   char cmd[100];
2515   int fd;
2516
2517   fd = appletFd;
2518   if (fd<0) {
2519     CmiPrintf("client Exited\n");
2520     return; /* maybe the requester exited */
2521   }
2522   sprintf(cmd, "reply %10d\n", size);
2523   writeall(fd, cmd, strlen(cmd));
2524   writeall(fd, msg, size);
2525 #if CMK_SYNCHRONIZE_ON_TCP_CLOSE
2526   shutdown(fd, 1);
2527   while (read(fd, &c, 1)==EINTR);
2528 #endif
2529 }
2530 #endif
2531
2532 #endif
2533 /*****************************************************************************
2534  *
2535  * Converse Initialization
2536  *
2537  *****************************************************************************/
2538
2539 extern void CrnInit(void);
2540
2541 void ConverseCommonInit(char **argv)
2542 {
2543 #if NODE_0_IS_CONVHOST
2544   int i,j;
2545 #endif
2546   CmiTimerInit();
2547   CstatsInit(argv);
2548   CcdModuleInit(argv);
2549   CmiHandlerInit();
2550   CmiMemoryInit(argv);
2551   CmiDeliversInit();
2552   CsdInit(argv);
2553   CthSchedInit();
2554   CmiGroupInit();
2555   CmiMulticastInit();
2556   CmiInitMultipleSend();
2557   CcsInit();
2558 #if CMK_DEBUG_MODE
2559   CpdInit();
2560 #endif
2561 #if CMK_WEB_MODE
2562   CWebInit();
2563 #endif
2564 #if NODE_0_IS_CONVHOST
2565   CHostInit();
2566   skt_server(&hostport, &hostskt);
2567   CmiSignal(SIGALRM, SIGIO, 0, CommunicationServer);
2568   CmiEnableAsyncIO(hostskt);
2569   CHostRegister();
2570  
2571   if(CmiMyPe() == 0){
2572     char *ptr;
2573     i = 0;
2574     for(ptr = argv[i]; ptr != 0; i++, ptr = argv[i])
2575       if(strcmp(ptr, "++server") == 0) {
2576         serverFlag = 1;
2577         for(j = i; argv[j] != 0; j++)
2578           argv[j] = argv[j+1];
2579         break;
2580       }
2581   }
2582 #endif
2583   CldModuleInit();
2584 }
2585
2586 void ConverseCommonExit(void)
2587 {
2588 #if NODE_0_IS_CONVHOST
2589   if((CmiMyPe() == 0) && (clientIP != 0)){
2590     int fd;
2591     fd = skt_connect(clientIP, clientKillPort, 120);
2592     if (fd>0){ 
2593       write(fd, "die\n", strlen("die\n"));
2594     }
2595   }
2596 #endif
2597 #ifndef CMK_OPTIMIZE
2598   traceClose();
2599 #endif
2600 }
2601
2602