Merged arrays into Charj mainline.
[charm.git] / src / conv-core / convcore.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 /** 
8   @defgroup Converse
9   \brief Converse--a parallel portability layer.
10
11   Converse is the lowest level inside the Charm++ hierarchy. It stands on top
12   of the machine layer, and it provides all the common functionality across
13   platforms.
14
15   One converse program is running on every processor (or node in the smp
16   version). it manages the message transmission, and the memory allocation.
17   Charm++, which is on top of Converse, uses its functionality for
18   interprocess *communication.
19
20   In order to maintain multiple independent objects inside a single user space
21   program, it uses a personalized version of threads, which can be executed,
22   suspended, and migrated across processors.
23
24   It provides a scheduler for message delivery: methods can be registered to
25   the scheduler, and then messages allocated through CmiAlloc can be sent to
26   the correspondent method in a remote processor. This is done through the
27   converse header (which has few common fields, but is architecture dependent).
28
29   @defgroup ConverseScheduler 
30   \brief The portion of Converse responsible for scheduling the execution of 
31   incoming messages.
32   
33   Converse provides a scheduler for message delivery: methods can be registered to 
34   the scheduler, and then messages allocated through CmiAlloc can be sent to 
35   the correspondent method in a remote processor. This is done through the
36   converse header (which has few common fields, but is architecture dependent).
37
38   In converse the CsdScheduleForever() routine will run an infinite while loop that 
39   looks for available messages to processes from the message queue. Messages in the 
40   queue are dequeued by the CsdNextMessage() routine. When a 
41   message is taken from the queue it is then passed into CmiHandleMessage() which
42   calls the handler associated with the message.
43
44   Incoming messages that are destined for Charm++ will be passed to the 
45   \ref CharmScheduler "charm scheduling routines".
46
47   @file
48   converse main core
49   @ingroup Converse
50   @ingroup ConverseScheduler
51  
52   @addtogroup Converse
53   @{
54
55 */
56
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <string.h>
60 #include <errno.h>
61 #ifndef _WIN32
62 #include <sys/time.h>
63 #include <sys/resource.h>
64 #endif
65
66 #include "converse.h"
67 #include "conv-trace.h"
68 #include "sockRoutines.h"
69 #include "queueing.h"
70 #include "conv-ccs.h"
71 #include "ccs-server.h"
72 #include "memory-isomalloc.h"
73 #include "converseEvents.h"             /* projector */
74 #include "traceCoreCommon.h"    /* projector */
75 #include "machineEvents.h"     /* projector */
76
77 #if CMK_OUT_OF_CORE
78 #include "conv-ooc.h"
79 #endif
80
81 #if CONVERSE_POOL
82 #include "cmipool.h"
83 #endif
84
85 #if CMK_CONDS_USE_SPECIAL_CODE
86 CmiSwitchToPEFnPtr CmiSwitchToPE;
87 #endif
88
89 CpvExtern(int, _traceCoreOn);   /* projector */
90 extern void CcdModuleInit(char **);
91 extern void CmiMemoryInit(char **);
92 extern void CldModuleInit(char **);
93
94 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
95 #include <sys/types.h>
96 #include <sys/time.h>
97 #endif
98
99 #if CMK_TIMER_USE_TIMES
100 #include <sys/times.h>
101 #include <limits.h>
102 #include <unistd.h>
103 #endif
104
105 #if CMK_TIMER_USE_GETRUSAGE
106 #include <sys/time.h>
107 #include <sys/resource.h>
108 #endif
109
110 #if CMK_TIMER_USE_RDTSC
111 #include <string.h>
112 #include <unistd.h>
113 #include <time.h>
114 #include <stdio.h>
115 #include <sys/time.h>
116 #include <sys/resource.h>
117 #endif
118
119 #ifdef CMK_TIMER_USE_WIN32API
120 #include <stdlib.h>
121 #include <time.h>
122 #include <sys/types.h>
123 #include <sys/timeb.h>
124 #endif
125
126 #ifdef CMK_CUDA
127 #include "cuda-hybrid-api.h"
128 #endif
129
130 #include "quiescence.h"
131
132 int cur_restart_phase = 1;      /* checkpointing/restarting phase counter */
133
134 static int CsdLocalMax = CSD_LOCAL_MAX_DEFAULT;
135
136 CpvStaticDeclare(int, CmiMainHandlerIDP); /* Main handler for _CmiMultipleSend that is run on every node */
137
138 #if CMK_MEM_CHECKPOINT
139 void (*notify_crash_fn)(int) = NULL;
140 #endif
141
142 CpvDeclare(char *, _validProcessors);
143
144 /*****************************************************************************
145  *
146  * Unix Stub Functions
147  *
148  ****************************************************************************/
149
150 #ifdef MEMMONITOR
151 typedef unsigned long mmulong;
152 CpvDeclare(mmulong,MemoryUsage);
153 CpvDeclare(mmulong,HiWaterMark);
154 CpvDeclare(mmulong,ReportedHiWaterMark);
155 CpvDeclare(int,AllocCount);
156 CpvDeclare(int,BlocksAllocated);
157 #endif
158
159 #define MAX_HANDLERS 512
160
161 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
162 CpvDeclare(int,expIOFlushFlag);
163 #if CMI_IO_BUFFER_EXPLICIT
164 /* 250k not too large depending on how slow terminal IO is */
165 #define DEFAULT_IO_BUFFER_SIZE 250000
166 CpvDeclare(char*,explicitIOBuffer);
167 CpvDeclare(int,expIOBufferSize);
168 #endif
169 #endif
170
171 #if CMK_NODE_QUEUE_AVAILABLE
172 void  *CmiGetNonLocalNodeQ();
173 #endif
174
175 CpvDeclare(void*, CsdSchedQueue);
176
177 #if CMK_OUT_OF_CORE
178 /* The Queue where the Prefetch Thread puts the messages from CsdSchedQueue  */
179 CpvDeclare(void*, CsdPrefetchQueue);
180 pthread_mutex_t prefetchLock;
181 #endif
182
183 #if CMK_NODE_QUEUE_AVAILABLE
184 CsvDeclare(void*, CsdNodeQueue);
185 CsvDeclare(CmiNodeLock, CsdNodeQueueLock);
186 #endif
187 CpvDeclare(int,   CsdStopFlag);
188 CpvDeclare(int,   CsdLocalCounter);
189
190 CmiNodeLock _smp_mutex;               /* for smp */
191
192 #if CONVERSE_VERSION_VMI
193 void *CMI_VMI_CmiAlloc (int size);
194 void CMI_VMI_CmiFree (void *ptr);
195 #endif
196
197 #if CONVERSE_VERSION_ELAN
198 void* elan_CmiAlloc(int size);
199 #endif
200
201 #if CMK_USE_IBVERBS | CMK_USE_IBUD
202 void *infi_CmiAlloc(int size);
203 void infi_CmiFree(void *ptr);
204 void infi_freeMultipleSend(void *ptr);
205 void infi_unregAndFreeMeta(void *ch);
206 #endif
207
208
209 #if CMK_GRID_QUEUE_AVAILABLE
210 CpvDeclare(void *, CkGridObject);
211 CpvDeclare(void *, CsdGridQueue);
212 #endif
213
214
215 /*****************************************************************************
216  *
217  * Command-Line Argument (CLA) parsing routines.
218  *
219  *****************************************************************************/
220
221 static int usageChecked=0; /* set when argv has been searched for a usage request */
222 static int printUsage=0; /* if set, print command-line usage information */
223 static const char *CLAformatString="%20s %10s %s\n";
224
225 /** This little list of CLA's holds the argument descriptions until it's
226    safe to print them--it's needed because the net- versions don't have 
227    printf until they're pretty well started.
228  */
229 typedef struct {
230         const char *arg; /* Flag name, like "-foo"*/
231         const char *param; /* Argument's parameter type, like "integer" or "none"*/
232         const char *desc; /* Human-readable description of what it does */
233 } CLA;
234 static int CLAlistLen=0;
235 static int CLAlistMax=0;
236 static CLA *CLAlist=NULL;
237
238 /** Add this CLA */
239 static void CmiAddCLA(const char *arg,const char *param,const char *desc) {
240         int i;
241         if (CmiMyPe()!=0) return; /*Don't bother if we're not PE 0*/
242         if (desc==NULL) return; /*It's an internal argument*/
243         if (usageChecked) { /* Printf should work now */
244                 if (printUsage)
245                         CmiPrintf(CLAformatString,arg,param,desc);
246         }
247         else { /* Printf doesn't work yet-- just add to the list.
248                 This assumes the const char *'s are static references,
249                 which is probably reasonable. */
250                 i=CLAlistLen++;
251                 if (CLAlistLen>CLAlistMax) { /*Grow the CLA list */
252                         CLAlistMax=16+2*CLAlistLen;
253                         CLAlist=realloc(CLAlist,sizeof(CLA)*CLAlistMax);\
254                 }
255                 CLAlist[i].arg=arg;
256                 CLAlist[i].param=param;
257                 CLAlist[i].desc=desc;
258         }
259 }
260
261 /** Print out the stored list of CLA's */
262 static void CmiPrintCLAs(void) {
263         int i;
264         if (CmiMyPe()!=0) return; /*Don't bother if we're not PE 0*/
265         CmiPrintf("Converse Machine Command-line Parameters:\n ");
266         CmiPrintf(CLAformatString,"Option:","Parameter:","Description:");
267         for (i=0;i<CLAlistLen;i++) {
268                 CLA *c=&CLAlist[i];
269                 CmiPrintf(CLAformatString,c->arg,c->param,c->desc);
270         }
271 }
272
273 /**
274  * Determines if command-line usage information should be printed--
275  * that is, if a "-?", "-h", or "--help" flag is present.
276  * Must be called after printf is setup.
277  */
278 void CmiArgInit(char **argv) {
279         int i;
280         for (i=0;argv[i]!=NULL;i++)
281         {
282                 if (0==strcmp(argv[i],"-?") ||
283                     0==strcmp(argv[i],"-h") ||
284                     0==strcmp(argv[i],"--help")) 
285                 {
286                         printUsage=1;
287                         /* Don't delete arg:  CmiDeleteArgs(&argv[i],1);
288                           Leave it there for user program to see... */
289                         CmiPrintCLAs();
290                 }
291         }
292         if (CmiMyPe()==0) { /* Throw away list of stored CLA's */
293                 CLAlistLen=CLAlistMax=0;
294                 free(CLAlist); CLAlist=NULL;
295         }
296         usageChecked=1;
297 }
298
299 /** Return 1 if we're currently printing command-line usage information. */
300 int CmiArgGivingUsage(void) {
301         return (CmiMyPe()==0) && printUsage;
302 }
303
304 /** Identifies the module that accepts the following command-line parameters */
305 void CmiArgGroup(const char *parentName,const char *groupName) {
306         if (CmiArgGivingUsage()) {
307                 if (groupName==NULL) groupName=parentName; /* Start of a new group */
308                 CmiPrintf("\n%s Command-line Parameters:\n",groupName);
309         }
310 }
311
312 /** Count the number of non-NULL arguments in list*/
313 int CmiGetArgc(char **argv)
314 {
315         int i=0,argc=0;
316         if (argv)
317         while (argv[i++]!=NULL)
318                 argc++;
319         return argc;
320 }
321
322 /** Return a new, heap-allocated copy of the argv array*/
323 char **CmiCopyArgs(char **argv)
324 {
325         int argc=CmiGetArgc(argv);
326         char **ret=(char **)malloc(sizeof(char *)*(argc+1));
327         int i;
328         for (i=0;i<=argc;i++)
329                 ret[i]=argv[i];
330         return ret;
331 }
332
333 /** Delete the first k argument from the given list, shifting
334 all other arguments down by k spaces.
335 e.g., argv=={"a","b","c","d",NULL}, k==3 modifies
336 argv={"d",NULL,"c","d",NULL}
337 */
338 void CmiDeleteArgs(char **argv,int k)
339 {
340         int i=0;
341         while ((argv[i]=argv[i+k])!=NULL)
342                 i++;
343 }
344
345 /** Find the given argment and string option in argv.
346 If the argument is present, set the string option and
347 delete both from argv.  If not present, return NULL.
348 e.g., arg=="-name" returns "bob" from
349 argv=={"a.out","foo","-name","bob","bar"},
350 and sets argv={"a.out","foo","bar"};
351 */
352 int CmiGetArgStringDesc(char **argv,const char *arg,char **optDest,const char *desc)
353 {
354         int i;
355         CmiAddCLA(arg,"string",desc);
356         for (i=0;argv[i]!=NULL;i++)
357                 if (0==strcmp(argv[i],arg))
358                 {/*We found the argument*/
359                         if (argv[i+1]==NULL) CmiAbort("Argument not complete!");
360                         *optDest=argv[i+1];
361                         CmiDeleteArgs(&argv[i],2);
362                         return 1;
363                 }
364         return 0;/*Didn't find the argument*/
365 }
366 int CmiGetArgString(char **argv,const char *arg,char **optDest) {
367         return CmiGetArgStringDesc(argv,arg,optDest,"");
368 }
369
370 /** Find the given argument and floating-point option in argv.
371 Remove it and return 1; or return 0.
372 */
373 int CmiGetArgDoubleDesc(char **argv,const char *arg,double *optDest,const char *desc) {
374         char *number=NULL;
375         CmiAddCLA(arg,"number",desc);
376         if (!CmiGetArgStringDesc(argv,arg,&number,NULL)) return 0;
377         if (1!=sscanf(number,"%lg",optDest)) return 0;
378         return 1;
379 }
380 int CmiGetArgDouble(char **argv,const char *arg,double *optDest) {
381         return CmiGetArgDoubleDesc(argv,arg,optDest,"");
382 }
383
384 /** Find the given argument and integer option in argv.
385 If the argument is present, parse and set the numeric option,
386 delete both from argv, and return 1. If not present, return 0.
387 e.g., arg=="-pack" matches argv=={...,"-pack","27",...},
388 argv=={...,"-pack0xf8",...}, and argv=={...,"-pack=0777",...};
389 but not argv=={...,"-packsize",...}.
390 */
391 int CmiGetArgIntDesc(char **argv,const char *arg,int *optDest,const char *desc)
392 {
393         int i;
394         int argLen=strlen(arg);
395         CmiAddCLA(arg,"integer",desc);
396         for (i=0;argv[i]!=NULL;i++)
397                 if (0==strncmp(argv[i],arg,argLen))
398                 {/*We *may* have found the argument*/
399                         const char *opt=NULL;
400                         int nDel=0;
401                         switch(argv[i][argLen]) {
402                         case 0: /* like "-p","27" */
403                                 opt=argv[i+1]; nDel=2; break;
404                         case '=': /* like "-p=27" */
405                                 opt=&argv[i][argLen+1]; nDel=1; break;
406                         case '-':case '+':
407                         case '0':case '1':case '2':case '3':case '4':
408                         case '5':case '6':case '7':case '8':case '9':
409                                 /* like "-p27" */
410                                 opt=&argv[i][argLen]; nDel=1; break;
411                         default:
412                                 continue; /*False alarm-- skip it*/
413                         }
414                         if (opt==NULL) continue; /*False alarm*/
415                         if (sscanf(opt,"%i",optDest)<1) {
416                         /*Bad command line argument-- die*/
417                                 fprintf(stderr,"Cannot parse %s option '%s' "
418                                         "as an integer.\n",arg,opt);
419                                 CmiAbort("Bad command-line argument\n");
420                         }
421                         CmiDeleteArgs(&argv[i],nDel);
422                         return 1;
423                 }
424         return 0;/*Didn't find the argument-- dest is unchanged*/       
425 }
426 int CmiGetArgInt(char **argv,const char *arg,int *optDest) {
427         return CmiGetArgIntDesc(argv,arg,optDest,"");
428 }
429
430 int CmiGetArgLongDesc(char **argv,const char *arg,CmiInt8 *optDest,const char *desc)
431 {
432         int i;
433         int argLen=strlen(arg);
434         CmiAddCLA(arg,"integer",desc);
435         for (i=0;argv[i]!=NULL;i++)
436                 if (0==strncmp(argv[i],arg,argLen))
437                 {/*We *may* have found the argument*/
438                         const char *opt=NULL;
439                         int nDel=0;
440                         switch(argv[i][argLen]) {
441                         case 0: /* like "-p","27" */
442                                 opt=argv[i+1]; nDel=2; break;
443                         case '=': /* like "-p=27" */
444                                 opt=&argv[i][argLen+1]; nDel=1; break;
445                         case '-':case '+':
446                         case '0':case '1':case '2':case '3':case '4':
447                         case '5':case '6':case '7':case '8':case '9':
448                                 /* like "-p27" */
449                                 opt=&argv[i][argLen]; nDel=1; break;
450                         default:
451                                 continue; /*False alarm-- skip it*/
452                         }
453                         if (opt==NULL) continue; /*False alarm*/
454                         if (sscanf(opt,"%ld",optDest)<1) {
455                         /*Bad command line argument-- die*/
456                                 fprintf(stderr,"Cannot parse %s option '%s' "
457                                         "as a long integer.\n",arg,opt);
458                                 CmiAbort("Bad command-line argument\n");
459                         }
460                         CmiDeleteArgs(&argv[i],nDel);
461                         return 1;
462                 }
463         return 0;/*Didn't find the argument-- dest is unchanged*/       
464 }
465 int CmiGetArgLong(char **argv,const char *arg,CmiInt8 *optDest) {
466         return CmiGetArgLongDesc(argv,arg,optDest,"");
467 }
468
469 /** Find the given argument in argv.  If present, delete
470 it and return 1; if not present, return 0.
471 e.g., arg=="-foo" matches argv=={...,"-foo",...} but not
472 argv={...,"-foobar",...}.
473 */
474 int CmiGetArgFlagDesc(char **argv,const char *arg,const char *desc)
475 {
476         int i;
477         CmiAddCLA(arg,"",desc);
478         for (i=0;argv[i]!=NULL;i++)
479                 if (0==strcmp(argv[i],arg))
480                 {/*We found the argument*/
481                         CmiDeleteArgs(&argv[i],1);
482                         return 1;
483                 }
484         return 0;/*Didn't find the argument*/
485 }
486 int CmiGetArgFlag(char **argv,const char *arg) {
487         return CmiGetArgFlagDesc(argv,arg,"");
488 }
489
490
491 /*****************************************************************************
492  *
493  * Stack tracing routines.
494  *
495  *****************************************************************************/
496 #include "cmibacktrace.c"
497
498 /*
499 Convert "X(Y) Z" to "Y Z"-- remove text prior to first '(', and supress
500 the next parenthesis.  Operates in-place on the character data.
501 or Convert X(Y) to "Y" only, when trimname=1
502 */
503 static char *_implTrimParenthesis(char *str, int trimname) {
504   char *lParen=str, *ret=NULL, *rParen=NULL;
505   while (*lParen!='(') {
506     if (*lParen==0) return str; /* No left parenthesis at all. */
507     lParen++;
508   }
509   /* now *lParen=='(', so trim it*/
510   ret=lParen+1;
511   rParen=ret;
512   while (*rParen!=')') {
513     if (*rParen==0) return ret; /* No right parenthesis at all. */
514     rParen++;
515   }
516   /* now *rParen==')', so trim it*/
517   *rParen=trimname?0:' ';
518   return ret;  
519 }
520
521 /*
522 Return the text description of this trimmed routine name, if 
523 it's a system-generated routine where we should stop printing. 
524 This is probably overkill, but improves the appearance of callbacks.
525 */
526 static const char* _implGetBacktraceSys(const char *name) {
527   if (0==strncmp(name,"_call",5)) 
528   { /*it might be something we're interested in*/
529     if (0==strncmp(name,"_call_",6)) return "Call Entry Method";
530     if (0==strncmp(name,"_callthr_",9)) return "Call Threaded Entry Method";
531   }
532   if (0==strncmp(name,"CthResume",9)) return "Resumed thread";
533   if (0==strncmp(name,"qt_args",7)) return "Converse thread";
534   
535   return 0; /*ordinary user routine-- just print normally*/
536 }
537
538 /** Print out the names of these function pointers. */
539 void CmiBacktracePrint(void **retPtrs,int nLevels) {
540   if (nLevels>0) {
541     int i;
542     char **names=CmiBacktraceLookup(retPtrs,nLevels);
543     if (names==NULL) return;
544     CmiPrintf("[%d] Stack Traceback:\n", CmiMyPe());
545     for (i=0;i<nLevels;i++) {
546       if (names[i] == NULL) continue;
547       {
548       const char *trimmed=_implTrimParenthesis(names[i], 0);
549       const char *print=trimmed;
550       const char *sys=_implGetBacktraceSys(print);
551       if (sys) {
552           CmiPrintf("  [%d] Charm++ Runtime: %s (%s)\n",i,sys,print);
553           break; /*Stop when we hit Charm++ runtime.*/
554       } else {
555           CmiPrintf("  [%d:%d] %s\n",CmiMyPe(),i,print);
556       }
557      }
558     }
559     free(names);
560   }
561 }
562
563 /* Print (to stdout) the names of the functions that have been 
564    called up to this point. nSkip is the number of routines on the
565    top of the stack to *not* print out. */
566 void CmiPrintStackTrace(int nSkip) {
567 #if CMK_USE_BACKTRACE
568         int nLevels=max_stack;
569         void *stackPtrs[max_stack];
570         CmiBacktraceRecord(stackPtrs,1+nSkip,&nLevels);
571         CmiBacktracePrint(stackPtrs,nLevels);
572 #endif
573 }
574
575 int CmiIsFortranLibraryCall() {
576 #if CMK_USE_BACKTRACE
577   int ret = 0;
578   int nLevels=9;
579   void *stackPtrs[18];
580   CmiBacktraceRecord(stackPtrs,1,&nLevels);
581   if (nLevels>0) {
582     int i;
583     char **names=CmiBacktraceLookup(stackPtrs,nLevels);
584     if (names==NULL) return 0;
585     for (i=0;i<nLevels;i++) {
586       if (names[i] == NULL) continue;
587       const char *trimmed=_implTrimParenthesis(names[i], 1);
588       if (strncmp(trimmed, "for__", 5) == 0                /* ifort */
589           || strncmp(trimmed, "_xlf", 4) == 0               /* xlf90 */
590           || strncmp(trimmed, "_xlfBeginIO", 11) == 0 
591          )
592           {  /* CmiPrintf("[%d] NAME:%s\n", CmiMyPe(), trimmed); */
593              ret = 1; break; }
594     }
595     free(names);
596   }
597   return ret;
598 #else
599   return 0;
600 #endif
601 }
602
603 /*****************************************************************************
604  *
605  * Statistics: currently, the following statistics are not updated by converse.
606  *
607  *****************************************************************************/
608
609 CpvDeclare(int, CstatsMaxChareQueueLength);
610 CpvDeclare(int, CstatsMaxForChareQueueLength);
611 CpvDeclare(int, CstatsMaxFixedChareQueueLength);
612 CpvStaticDeclare(int, CstatPrintQueueStatsFlag);
613 CpvStaticDeclare(int, CstatPrintMemStatsFlag);
614
615 void CstatsInit(argv)
616 char **argv;
617 {
618
619 #ifdef MEMMONITOR
620   CpvInitialize(mmulong,MemoryUsage);
621   CpvAccess(MemoryUsage) = 0;
622   CpvInitialize(mmulong,HiWaterMark);
623   CpvAccess(HiWaterMark) = 0;
624   CpvInitialize(mmulong,ReportedHiWaterMark);
625   CpvAccess(ReportedHiWaterMark) = 0;
626   CpvInitialize(int,AllocCount);
627   CpvAccess(AllocCount) = 0;
628   CpvInitialize(int,BlocksAllocated);
629   CpvAccess(BlocksAllocated) = 0;
630 #endif
631
632   CpvInitialize(int, CstatsMaxChareQueueLength);
633   CpvInitialize(int, CstatsMaxForChareQueueLength);
634   CpvInitialize(int, CstatsMaxFixedChareQueueLength);
635   CpvInitialize(int, CstatPrintQueueStatsFlag);
636   CpvInitialize(int, CstatPrintMemStatsFlag);
637
638   CpvAccess(CstatsMaxChareQueueLength) = 0;
639   CpvAccess(CstatsMaxForChareQueueLength) = 0;
640   CpvAccess(CstatsMaxFixedChareQueueLength) = 0;
641   CpvAccess(CstatPrintQueueStatsFlag) = 0;
642   CpvAccess(CstatPrintMemStatsFlag) = 0;
643
644 #if 0
645   if (CmiGetArgFlagDesc(argv,"+mems", "Print memory statistics at shutdown"))
646     CpvAccess(CstatPrintMemStatsFlag)=1;
647   if (CmiGetArgFlagDesc(argv,"+qs", "Print queue statistics at shutdown"))
648     CpvAccess(CstatPrintQueueStatsFlag)=1;
649 #endif
650 }
651
652 int CstatMemory(i)
653 int i;
654 {
655   return 0;
656 }
657
658 int CstatPrintQueueStats()
659 {
660   return CpvAccess(CstatPrintQueueStatsFlag);
661 }
662
663 int CstatPrintMemStats()
664 {
665   return CpvAccess(CstatPrintMemStatsFlag);
666 }
667
668 /*****************************************************************************
669  *
670  * Cmi handler registration
671  *
672  *****************************************************************************/
673
674 CpvDeclare(CmiHandlerInfo*, CmiHandlerTable);
675 CpvStaticDeclare(int  , CmiHandlerCount);
676 CpvStaticDeclare(int  , CmiHandlerLocal);
677 CpvStaticDeclare(int  , CmiHandlerGlobal);
678 CpvDeclare(int,         CmiHandlerMax);
679
680 static void CmiExtendHandlerTable(int atLeastLen) {
681     int max = CpvAccess(CmiHandlerMax);
682     int newmax = (atLeastLen+(atLeastLen>>2)+32);
683     int bytes = max*sizeof(CmiHandlerInfo);
684     int newbytes = newmax*sizeof(CmiHandlerInfo);
685     CmiHandlerInfo *nu = (CmiHandlerInfo*)malloc(newbytes);
686     CmiHandlerInfo *tab = CpvAccess(CmiHandlerTable);
687     _MEMCHECK(nu);
688     memcpy(nu, tab, bytes);
689     memset(((char *)nu)+bytes, 0, (newbytes-bytes));
690     free(tab); tab=nu;
691     CpvAccess(CmiHandlerTable) = tab;
692     CpvAccess(CmiHandlerMax) = newmax;
693 }
694
695 void CmiNumberHandler(int n, CmiHandler h)
696 {
697   CmiHandlerInfo *tab;
698   if (n >= CpvAccess(CmiHandlerMax)) CmiExtendHandlerTable(n);
699   tab = CpvAccess(CmiHandlerTable);
700   tab[n].hdlr = (CmiHandlerEx)h; /* LIE!  This assumes extra pointer will be ignored!*/
701   tab[n].userPtr = 0;
702 }
703 void CmiNumberHandlerEx(int n, CmiHandlerEx h,void *userPtr) {
704   CmiHandlerInfo *tab;
705   if (n >= CpvAccess(CmiHandlerMax)) CmiExtendHandlerTable(n);
706   tab = CpvAccess(CmiHandlerTable);
707   tab[n].hdlr = h;
708   tab[n].userPtr=userPtr;
709 }
710
711 #if CMI_LOCAL_GLOBAL_AVAILABLE /*Leave room for local and global handlers*/
712 #  define DIST_BETWEEN_HANDLERS 3
713 #else /*No local or global handlers; ordinary handlers are back-to-back*/
714 #  define DIST_BETWEEN_HANDLERS 1
715 #endif
716
717 int CmiRegisterHandler(CmiHandler h)
718 {
719   int Count = CpvAccess(CmiHandlerCount);
720   CmiNumberHandler(Count, h);
721   CpvAccess(CmiHandlerCount) = Count+DIST_BETWEEN_HANDLERS;
722   return Count;
723 }
724 int CmiRegisterHandlerEx(CmiHandlerEx h,void *userPtr)
725 {
726   int Count = CpvAccess(CmiHandlerCount);
727   CmiNumberHandlerEx(Count, h, userPtr);
728   CpvAccess(CmiHandlerCount) = Count+DIST_BETWEEN_HANDLERS;
729   return Count;
730 }
731
732 #if CMI_LOCAL_GLOBAL_AVAILABLE
733 int CmiRegisterHandlerLocal(h)
734 CmiHandler h;
735 {
736   int Local = CpvAccess(CmiHandlerLocal);
737   CmiNumberHandler(Local, h);
738   CpvAccess(CmiHandlerLocal) = Local+3;
739   return Local;
740 }
741
742 int CmiRegisterHandlerGlobal(h)
743 CmiHandler h;
744 {
745   int Global = CpvAccess(CmiHandlerGlobal);
746   if (CmiMyPe()!=0) 
747     CmiError("CmiRegisterHandlerGlobal must only be called on PE 0.\n");
748   CmiNumberHandler(Global, h);
749   CpvAccess(CmiHandlerGlobal) = Global+3;
750   return Global;
751 }
752 #endif
753
754 static void _cmiZeroHandler(void *msg) {
755         CmiAbort("Converse zero handler executed-- was a message corrupted?\n");
756 }
757
758 static void CmiHandlerInit()
759 {
760   CpvInitialize(CmiHandlerInfo *, CmiHandlerTable);
761   CpvInitialize(int         , CmiHandlerCount);
762   CpvInitialize(int         , CmiHandlerLocal);
763   CpvInitialize(int         , CmiHandlerGlobal);
764   CpvInitialize(int         , CmiHandlerMax);
765   CpvAccess(CmiHandlerCount)  = 0;
766   CpvAccess(CmiHandlerLocal)  = 1;
767   CpvAccess(CmiHandlerGlobal) = 2;
768   CpvAccess(CmiHandlerMax) = 0; /* Table will be extended on the first registration*/
769   CpvAccess(CmiHandlerTable) = NULL;
770   CmiRegisterHandler((CmiHandler)_cmiZeroHandler);
771 }
772
773
774 /******************************************************************************
775  *
776  * CmiTimer
777  *
778  * Here are two possible implementations of CmiTimer.  Some machines don't
779  * select either, and define the timer in machine.c instead.
780  *
781  *****************************************************************************/
782
783 #if CMK_TIMER_USE_TIMES
784
785 CpvStaticDeclare(double, clocktick);
786 CpvStaticDeclare(int,inittime_wallclock);
787 CpvStaticDeclare(int,inittime_virtual);
788
789 int CmiTimerIsSynchronized()
790 {
791   return 0;
792 }
793
794 void CmiTimerInit()
795 {
796   struct tms temp;
797   CpvInitialize(double, clocktick);
798   CpvInitialize(int, inittime_wallclock);
799   CpvInitialize(int, inittime_virtual);
800   CpvAccess(inittime_wallclock) = times(&temp);
801   CpvAccess(inittime_virtual) = temp.tms_utime + temp.tms_stime;
802   CpvAccess(clocktick) = 1.0 / (sysconf(_SC_CLK_TCK));
803 }
804
805 double CmiWallTimer()
806 {
807   struct tms temp;
808   double currenttime;
809   int now;
810
811   now = times(&temp);
812   currenttime = (now - CpvAccess(inittime_wallclock)) * CpvAccess(clocktick);
813   return (currenttime);
814 }
815
816 double CmiCpuTimer()
817 {
818   struct tms temp;
819   double currenttime;
820   int now;
821
822   times(&temp);
823   now = temp.tms_stime + temp.tms_utime;
824   currenttime = (now - CpvAccess(inittime_virtual)) * CpvAccess(clocktick);
825   return (currenttime);
826 }
827
828 double CmiTimer()
829 {
830   return CmiCpuTimer();
831 }
832
833 #endif
834
835 #if CMK_TIMER_USE_GETRUSAGE
836
837 static double inittime_wallclock;
838 CpvStaticDeclare(double, inittime_virtual);
839
840 int CmiTimerIsSynchronized()
841 {
842   return 0;
843 }
844
845 void CmiTimerInit()
846 {
847   struct timeval tv;
848   struct rusage ru;
849   CpvInitialize(double, inittime_virtual);
850
851 #if ! CMK_MEM_CHECKPOINT
852   /* try to synchronize calling barrier */
853   CmiBarrier();
854   CmiBarrier();
855   CmiBarrier();
856 #endif
857
858   gettimeofday(&tv,0);
859   inittime_wallclock = (tv.tv_sec * 1.0) + (tv.tv_usec*0.000001);
860   getrusage(0, &ru); 
861   CpvAccess(inittime_virtual) =
862     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
863     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
864
865 #if ! CMK_MEM_CHECKPOINT
866   CmiBarrier();
867 /*  CmiBarrierZero(); */
868 #endif
869 }
870
871 double CmiCpuTimer()
872 {
873   struct rusage ru;
874   double currenttime;
875
876   getrusage(0, &ru);
877   currenttime =
878     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
879     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
880   return currenttime - CpvAccess(inittime_virtual);
881 }
882
883 static double lastT = -1.0;
884
885 double CmiWallTimer()
886 {
887   struct timeval tv;
888   double currenttime;
889
890   gettimeofday(&tv,0);
891   currenttime = (tv.tv_sec * 1.0) + (tv.tv_usec * 0.000001);
892 #if CMK_ERROR_CHECKING
893   if (lastT > 0.0 && currenttime < lastT) {
894     currenttime = lastT;
895   }
896   lastT = currenttime;
897 #endif
898   return currenttime - inittime_wallclock;
899 }
900
901 double CmiTimer()
902 {
903   return CmiCpuTimer();
904 }
905
906 #endif
907
908 #if CMK_TIMER_USE_RDTSC
909
910 static double readMHz(void)
911 {
912   double x;
913   char str[1000];
914   char buf[100];
915   FILE *fp;
916   CmiLock(_smp_mutex);
917   fp = fopen("/proc/cpuinfo", "r");
918   if (fp != NULL)
919   while(fgets(str, 1000, fp)!=0) {
920     if(sscanf(str, "cpu MHz%[^:]",buf)==1)
921     {
922       char *s = strchr(str, ':'); s=s+1;
923       sscanf(s, "%lf", &x);
924       fclose(fp);
925       CmiUnlock(_smp_mutex);
926       return x;
927     }
928   }
929   CmiUnlock(_smp_mutex);
930   CmiAbort("Cannot read CPU MHz from /proc/cpuinfo file.");
931   return 0.0;
932 }
933
934 double _cpu_speed_factor;
935 CpvStaticDeclare(double, inittime_virtual);
936 CpvStaticDeclare(double, inittime_walltime);
937
938 double  CmiStartTimer(void)
939 {
940   return CpvAccess(inittime_walltime);
941 }
942
943 void CmiTimerInit()
944 {
945   struct rusage ru;
946
947   CmiBarrier();
948   CmiBarrier();
949
950   _cpu_speed_factor = 1.0/(readMHz()*1.0e6); 
951   rdtsc(); rdtsc(); rdtsc(); rdtsc(); rdtsc();
952   CpvInitialize(double, inittime_walltime);
953   CpvAccess(inittime_walltime) = CmiWallTimer();
954   CpvInitialize(double, inittime_virtual);
955   getrusage(0, &ru); 
956   CpvAccess(inittime_virtual) =
957     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
958     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
959
960   CmiBarrierZero();
961 }
962
963 double CmiCpuTimer()
964 {
965   struct rusage ru;
966   double currenttime;
967
968   getrusage(0, &ru);
969   currenttime =
970     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
971     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
972   return currenttime - CpvAccess(inittime_virtual);
973 }
974
975 #endif
976
977 #if CMK_BLUEGENEL || CMK_BLUEGENEP
978 #include "dcopy.h"
979 #endif
980
981 #if CMK_TIMER_USE_BLUEGENEL
982
983 #include "rts.h"
984
985 #if 0 
986 #define SPRN_TBRL 0x10C  /* Time Base Read Lower Register (user & sup R/O) */
987 #define SPRN_TBRU 0x10D  /* Time Base Read Upper Register (user & sup R/O) */
988 #define SPRN_PIR  0x11E  /* CPU id */
989
990 static inline unsigned long long BGLTimebase(void)
991 {
992   unsigned volatile u1, u2, lo;
993   union
994   {
995     struct { unsigned hi, lo; } w;
996     unsigned long long d;
997   } result;
998                                                                                 
999   do {
1000     asm volatile ("mfspr %0,%1" : "=r" (u1) : "i" (SPRN_TBRU));
1001     asm volatile ("mfspr %0,%1" : "=r" (lo) : "i" (SPRN_TBRL));
1002     asm volatile ("mfspr %0,%1" : "=r" (u2) : "i" (SPRN_TBRU));
1003   } while (u1!=u2);
1004                                                                                 
1005   result.w.lo = lo;
1006   result.w.hi = u2;
1007   return result.d;
1008 }
1009 #endif
1010
1011 static unsigned long long inittime_wallclock = 0;
1012 CpvStaticDeclare(double, clocktick);
1013
1014 int CmiTimerIsSynchronized()
1015 {
1016   return 0;
1017 }
1018
1019 void CmiTimerInit()
1020 {
1021   BGLPersonality dst;
1022   CpvInitialize(double, clocktick);
1023   int size = sizeof(BGLPersonality);
1024   rts_get_personality(&dst, size);
1025   CpvAccess(clocktick) = 1.0 / dst.clockHz;
1026
1027   /* try to synchronize calling barrier */
1028   CmiBarrier();
1029   CmiBarrier();
1030   CmiBarrier();
1031
1032   /* inittime_wallclock = rts_get_timebase(); */
1033   inittime_wallclock = 0.0;    /* use bgl absolute time */
1034 }
1035
1036 double CmiWallTimer()
1037 {
1038   unsigned long long currenttime;
1039   currenttime = rts_get_timebase();
1040   return CpvAccess(clocktick)*(currenttime-inittime_wallclock);
1041 }
1042
1043 double CmiCpuTimer()
1044 {
1045   return CmiWallTimer();
1046 }
1047
1048 double CmiTimer()
1049 {
1050   return CmiWallTimer();
1051 }
1052
1053 #endif
1054
1055 #if CMK_TIMER_USE_BLUEGENEP  /* This module just compiles with GCC charm. */
1056
1057 void CmiTimerInit() {}
1058
1059 #if 0
1060 #include "common/bgp_personality.h"
1061 #include <spi/bgp_SPI.h>
1062
1063 #define SPRN_TBRL 0x10C  /* Time Base Read Lower Register (user & sup R/O) */
1064 #define SPRN_TBRU 0x10D  /* Time Base Read Upper Register (user & sup R/O) */
1065 #define SPRN_PIR  0x11E  /* CPU id */
1066
1067 static inline unsigned long long BGPTimebase(void)
1068 {
1069   unsigned volatile u1, u2, lo;
1070   union
1071   {
1072     struct { unsigned hi, lo; } w;
1073     unsigned long long d;
1074   } result;
1075                                                                          
1076   do {
1077     asm volatile ("mfspr %0,%1" : "=r" (u1) : "i" (SPRN_TBRU));
1078     asm volatile ("mfspr %0,%1" : "=r" (lo) : "i" (SPRN_TBRL));
1079     asm volatile ("mfspr %0,%1" : "=r" (u2) : "i" (SPRN_TBRU));
1080   } while (u1!=u2);
1081                                                                          
1082   result.w.lo = lo;
1083   result.w.hi = u2;
1084   return result.d;
1085 }
1086
1087 static unsigned long long inittime_wallclock = 0;
1088 CpvStaticDeclare(double, clocktick);
1089
1090 int CmiTimerIsSynchronized()
1091 {
1092   return 0;
1093 }
1094
1095 void CmiTimerInit()
1096 {
1097   _BGP_Personality_t dst;
1098   CpvInitialize(double, clocktick);
1099   int size = sizeof(_BGP_Personality_t);
1100   rts_get_personality(&dst, size);
1101
1102   CpvAccess(clocktick) = 1.0 / (dst.Kernel_Config.FreqMHz * 1e6);
1103
1104   /* try to synchronize calling barrier */
1105   CmiBarrier();
1106   CmiBarrier();
1107   CmiBarrier();
1108
1109   inittime_wallclock = BGPTimebase (); 
1110 }
1111
1112 double CmiWallTimer()
1113 {
1114   unsigned long long currenttime;
1115   currenttime = BGPTimebase();
1116   return CpvAccess(clocktick)*(currenttime-inittime_wallclock);
1117 }
1118 #endif
1119
1120 #include "dcmf.h"
1121
1122 double CmiWallTimer () {
1123   return DCMF_Timer();
1124 }
1125
1126 double CmiCpuTimer()
1127 {
1128   return CmiWallTimer();
1129 }
1130
1131 double CmiTimer()
1132 {
1133   return CmiWallTimer();
1134 }
1135
1136 #endif
1137
1138
1139 #if CMK_TIMER_USE_WIN32API
1140
1141 CpvStaticDeclare(double, inittime_wallclock);
1142 CpvStaticDeclare(double, inittime_virtual);
1143
1144 void CmiTimerInit()
1145 {
1146 #ifdef __CYGWIN__
1147         struct timeb tv;
1148 #else
1149         struct _timeb tv;
1150 #endif
1151         clock_t       ru;
1152
1153         CpvInitialize(double, inittime_wallclock);
1154         CpvInitialize(double, inittime_virtual);
1155         _ftime(&tv);
1156         CpvAccess(inittime_wallclock) = tv.time*1.0 + tv.millitm*0.001;
1157         ru = clock();
1158         CpvAccess(inittime_virtual) = ((double) ru)/CLOCKS_PER_SEC;
1159 }
1160
1161 double CmiCpuTimer()
1162 {
1163         clock_t ru;
1164         double currenttime;
1165
1166         ru = clock();
1167         currenttime = (double) ru/CLOCKS_PER_SEC;
1168
1169         return currenttime - CpvAccess(inittime_virtual);
1170 }
1171
1172 double CmiWallTimer()
1173 {
1174 #ifdef __CYGWIN__
1175         struct timeb tv;
1176 #else
1177         struct _timeb tv;
1178 #endif
1179         double currenttime;
1180
1181         _ftime(&tv);
1182         currenttime = tv.time*1.0 + tv.millitm*0.001;
1183
1184         return currenttime - CpvAccess(inittime_wallclock);
1185 }
1186         
1187
1188 double CmiTimer()
1189 {
1190         return CmiCpuTimer();
1191 }
1192
1193 #endif
1194
1195 #if CMK_TIMER_USE_RTC
1196
1197 #if __crayx1
1198  /* For _rtc() on Cray X1 */
1199 #include <intrinsics.h>
1200 #endif
1201
1202 static double clocktick;
1203 CpvStaticDeclare(long long, inittime_wallclock);
1204
1205 void CmiTimerInit()
1206 {
1207   CpvInitialize(long long, inittime_wallclock);
1208   CpvAccess(inittime_wallclock) = _rtc();
1209   clocktick = 1.0 / (double)(sysconf(_SC_SV2_USER_TIME_RATE));
1210 }
1211
1212 double CmiWallTimer()
1213 {
1214   long long now;
1215
1216   now = _rtc();
1217   return (clocktick * (now - CpvAccess(inittime_wallclock)));
1218 }
1219
1220 double CmiCpuTimer()
1221 {
1222   return CmiWallTimer();
1223 }
1224
1225 double CmiTimer()
1226 {
1227   return CmiCpuTimer();
1228 }
1229
1230 #endif
1231
1232 #if CMK_TIMER_USE_AIX_READ_TIME
1233
1234 #include <sys/time.h>
1235
1236 static timebasestruct_t inittime_wallclock;
1237 static double clocktick;
1238 CpvStaticDeclare(double, inittime_virtual);
1239
1240 void CmiTimerInit()
1241 {
1242   struct rusage ru;
1243
1244   if (CmiMyRank() == 0) {
1245     read_wall_time(&inittime_wallclock, TIMEBASE_SZ);
1246     time_base_to_time(&inittime_wallclock, TIMEBASE_SZ);
1247   }
1248
1249   CpvInitialize(double, inittime_virtual);
1250   getrusage(0, &ru);
1251   CpvAccess(inittime_virtual) =
1252     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1253     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1254 }
1255
1256 double CmiWallTimer()
1257 {
1258   int secs, n_secs;
1259   double curt;
1260   timebasestruct_t now;
1261   read_wall_time(&now, TIMEBASE_SZ);
1262   time_base_to_time(&now, TIMEBASE_SZ);
1263
1264   secs = now.tb_high - inittime_wallclock.tb_high;
1265   n_secs = now.tb_low - inittime_wallclock.tb_low;
1266   if (n_secs < 0)  {
1267     secs--;
1268     n_secs += 1000000000;
1269   }
1270   curt = secs*1.0 + n_secs*1e-9;
1271   return curt;
1272 }
1273
1274 double CmiCpuTimer()
1275 {
1276   struct rusage ru;
1277   double currenttime;
1278
1279   getrusage(0, &ru);
1280   currenttime =
1281     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1282     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1283   return currenttime - CpvAccess(inittime_virtual);
1284 }
1285
1286 double CmiTimer()
1287 {
1288   return CmiWallTimer();
1289 }
1290
1291 #endif
1292
1293 #ifndef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
1294 /** Return 1 if our outgoing message queue 
1295    for this node is longer than this many bytes. */
1296 int CmiLongSendQueue(int forNode,int longerThanBytes) {
1297   return 0;
1298 }
1299 #endif
1300
1301 #if CMK_SIGNAL_USE_SIGACTION
1302 #include <signal.h>
1303 void CmiSignal(int sig1, int sig2, int sig3, void (*handler)())
1304 {
1305   struct sigaction in, out ;
1306   in.sa_handler = handler;
1307   sigemptyset(&in.sa_mask);
1308   if (sig1) sigaddset(&in.sa_mask, sig1);
1309   if (sig2) sigaddset(&in.sa_mask, sig2);
1310   if (sig3) sigaddset(&in.sa_mask, sig3);
1311   in.sa_flags = 0;
1312   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
1313   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
1314   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
1315 }
1316 #endif
1317
1318 #if CMK_SIGNAL_USE_SIGACTION_WITH_RESTART
1319 #include <signal.h>
1320 void CmiSignal(sig1, sig2, sig3, handler)
1321 int sig1, sig2, sig3;
1322 void (*handler)();
1323 {
1324   struct sigaction in, out ;
1325   in.sa_handler = handler;
1326   sigemptyset(&in.sa_mask);
1327   if (sig1) sigaddset(&in.sa_mask, sig1);
1328   if (sig2) sigaddset(&in.sa_mask, sig2);
1329   if (sig3) sigaddset(&in.sa_mask, sig3);
1330   in.sa_flags = SA_RESTART;
1331   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
1332   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
1333   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
1334 }
1335 #endif
1336
1337 /** 
1338  *  @addtogroup ConverseScheduler
1339  *  @{
1340  */
1341
1342 /*****************************************************************************
1343  * 
1344  * The following is the CsdScheduler function.  A common
1345  * implementation is provided below.  The machine layer can provide an
1346  * alternate implementation if it so desires.
1347  *
1348  * void CmiDeliversInit()
1349  *
1350  *      - CmiInit promises to call this before calling CmiDeliverMsgs
1351  *        or any of the other functions in this section.
1352  *
1353  * int CmiDeliverMsgs(int maxmsgs)
1354  *
1355  *      - CmiDeliverMsgs will retrieve up to maxmsgs that were transmitted
1356  *        with the Cmi, and will invoke their handlers.  It does not wait
1357  *        if no message is unavailable.  Instead, it returns the quantity
1358  *        (maxmsgs-delivered), where delivered is the number of messages it
1359  *        delivered.
1360  *
1361  * void CmiDeliverSpecificMsg(int handlerno)
1362  *
1363  *      - Waits for a message with the specified handler to show up, then
1364  *        invokes the message's handler.  Note that unlike CmiDeliverMsgs,
1365  *        This function _does_ wait.
1366  *
1367  * For this common implementation to work, the machine layer must provide the
1368  * following:
1369  *
1370  * void *CmiGetNonLocal()
1371  *
1372  *      - returns a message just retrieved from some other PE, not from
1373  *        local.  If no such message exists, returns 0.
1374  *
1375  * CpvExtern(CdsFifo, CmiLocalQueue);
1376  *
1377  *      - a FIFO queue containing all messages from the local processor.
1378  *
1379  *****************************************************************************/
1380
1381 void CsdBeginIdle(void)
1382 {
1383   CcdCallBacks();
1384   _LOG_E_PROC_IDLE();   /* projector */
1385   CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE) ;
1386 }
1387
1388 void CsdStillIdle(void)
1389 {
1390   CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE);
1391 }
1392
1393 void CsdEndIdle(void)
1394 {
1395   _LOG_E_PROC_BUSY();   /* projector */
1396   CcdRaiseCondition(CcdPROCESSOR_BEGIN_BUSY) ;
1397 }
1398
1399 extern int _exitHandlerIdx;
1400
1401 /** Takes a message and calls its corresponding handler. */
1402 void CmiHandleMessage(void *msg)
1403 {
1404 /* this is wrong because it counts the Charm++ messages in sched queue
1405         CpvAccess(cQdState)->mProcessed++;
1406 */
1407         CmiHandlerInfo *h;
1408 #if CMK_TRACE_ENABLED
1409         CmiUInt2 handler=CmiGetHandler(msg); /* Save handler for use after msg is gone */
1410         _LOG_E_HANDLER_BEGIN(handler); /* projector */
1411         /* setMemoryStatus(1) */ /* charmdebug */
1412 #endif
1413
1414 /*
1415         FAULT_EVAC
1416 */
1417 /*      if((!CpvAccess(_validProcessors)[CmiMyPe()]) && handler != _exitHandlerIdx){
1418                 return;
1419         }*/
1420         
1421         MESSAGE_PHASE_CHECK(msg)
1422
1423         h=&CmiGetHandlerInfo(msg);
1424         (h->hdlr)(msg,h->userPtr);
1425 #if CMK_TRACE_ENABLED
1426         /* setMemoryStatus(0) */ /* charmdebug */
1427         _LOG_E_HANDLER_END(handler);    /* projector */
1428 #endif
1429 }
1430
1431 #if CMK_CMIDELIVERS_USE_COMMON_CODE
1432
1433 void CmiDeliversInit()
1434 {
1435 }
1436
1437 int CmiDeliverMsgs(int maxmsgs)
1438 {
1439   return CsdScheduler(maxmsgs);
1440 }
1441
1442 #if CMK_OBJECT_QUEUE_AVAILABLE
1443 CpvDeclare(void *, CsdObjQueue);
1444 #endif
1445
1446 void CsdSchedulerState_new(CsdSchedulerState_t *s)
1447 {
1448 #if CMK_OBJECT_QUEUE_AVAILABLE
1449         s->objQ=CpvAccess(CsdObjQueue);
1450 #endif
1451         s->localQ=CpvAccess(CmiLocalQueue);
1452         s->schedQ=CpvAccess(CsdSchedQueue);
1453         s->localCounter=&(CpvAccess(CsdLocalCounter));
1454 #if CMK_NODE_QUEUE_AVAILABLE
1455         s->nodeQ=CsvAccess(CsdNodeQueue);
1456         s->nodeLock=CsvAccess(CsdNodeQueueLock);
1457 #endif
1458 #if CMK_GRID_QUEUE_AVAILABLE
1459         s->gridQ=CpvAccess(CsdGridQueue);
1460 #endif
1461 }
1462
1463
1464 /** Dequeue and return the next message from the message queue. */
1465 void *CsdNextMessage(CsdSchedulerState_t *s) {
1466         void *msg;
1467         if((*(s->localCounter))-- >0)
1468           {
1469               /* This avoids a race condition with migration detected by megatest*/
1470               msg=CdsFifo_Dequeue(s->localQ);
1471               if (msg!=NULL)
1472                 {
1473                   CpvAccess(cQdState)->mProcessed++;
1474                   return msg;       
1475                 }
1476               CqsDequeue(s->schedQ,(void **)&msg);
1477               if (msg!=NULL) return msg;
1478           }
1479         
1480         *(s->localCounter)=CsdLocalMax;
1481         if ( NULL!=(msg=CmiGetNonLocal()) || 
1482              NULL!=(msg=CdsFifo_Dequeue(s->localQ)) ) {
1483             CpvAccess(cQdState)->mProcessed++;
1484             return msg;
1485         }
1486 #if CMK_GRID_QUEUE_AVAILABLE
1487         /*#warning "CsdNextMessage: CMK_GRID_QUEUE_AVAILABLE" */
1488         CqsDequeue (s->gridQ, (void **) &msg);
1489         if (msg != NULL) {
1490           return (msg);
1491         }
1492 #endif
1493 #if CMK_NODE_QUEUE_AVAILABLE
1494         /*#warning "CsdNextMessage: CMK_NODE_QUEUE_AVAILABLE" */
1495         if (NULL!=(msg=CmiGetNonLocalNodeQ())) return msg;
1496         if (!CqsEmpty(s->nodeQ)
1497          && !CqsPrioGT(CqsGetPriority(s->nodeQ),
1498                        CqsGetPriority(s->schedQ))) {
1499           CmiLock(s->nodeLock);
1500           CqsDequeue(s->nodeQ,(void **)&msg);
1501           CmiUnlock(s->nodeLock);
1502           if (msg!=NULL) return msg;
1503         }
1504 #endif
1505 #if CMK_OBJECT_QUEUE_AVAILABLE
1506         /*#warning "CsdNextMessage: CMK_OBJECT_QUEUE_AVAILABLE"   */
1507         if (NULL!=(msg=CdsFifo_Dequeue(s->objQ))) {
1508           return msg;
1509         }
1510 #endif
1511         if(!CsdLocalMax) {
1512           CqsDequeue(s->schedQ,(void **)&msg);
1513           if (msg!=NULL) return msg;        
1514         }
1515         return NULL;
1516 }
1517
1518 int CsdScheduler(int maxmsgs)
1519 {
1520         if (maxmsgs<0) CsdScheduleForever();    
1521         else if (maxmsgs==0)
1522                 CsdSchedulePoll();
1523         else /*(maxmsgs>0)*/ 
1524                 return CsdScheduleCount(maxmsgs);
1525         return 0;
1526 }
1527
1528 /*Declare the standard scheduler housekeeping*/
1529 #define SCHEDULE_TOP \
1530       void *msg;\
1531       int *CsdStopFlag_ptr = &CpvAccess(CsdStopFlag); \
1532       int cycle = CpvAccess(CsdStopFlag); \
1533       CsdSchedulerState_t state;\
1534       CsdSchedulerState_new(&state);
1535
1536 /*A message is available-- process it*/
1537 #define SCHEDULE_MESSAGE \
1538       CmiHandleMessage(msg);\
1539       if (*CsdStopFlag_ptr != cycle) break;
1540
1541 /*No message available-- go (or remain) idle*/
1542 #define SCHEDULE_IDLE \
1543       if (!isIdle) {isIdle=1;CsdBeginIdle();}\
1544       else CsdStillIdle();\
1545       if (*CsdStopFlag_ptr != cycle) {\
1546         CsdEndIdle();\
1547         break;\
1548       }
1549
1550 /*
1551         EVAC
1552 */
1553 extern void CkClearAllArrayElements();
1554
1555
1556 extern void machine_OffloadAPIProgress();
1557
1558 /** The main scheduler loop that repeatedly executes messages from a queue, forever. */
1559 void CsdScheduleForever(void)
1560 {
1561   #if CMK_CELL
1562     #define CMK_CELL_PROGRESS_FREQ  96  /* (MSG-Q Entries x1.5) */
1563     int progressCount = CMK_CELL_PROGRESS_FREQ;
1564   #endif
1565
1566   #ifdef CMK_CUDA
1567     #define CMK_CUDA_PROGRESS_FREQ 50
1568     int cudaProgressCount = CMK_CUDA_PROGRESS_FREQ;
1569   #endif
1570
1571   int isIdle=0;
1572   SCHEDULE_TOP
1573   while (1) {
1574     msg = CsdNextMessage(&state);
1575     if (msg!=NULL) { /*A message is available-- process it*/
1576       if (isIdle) {isIdle=0;CsdEndIdle();}
1577       SCHEDULE_MESSAGE
1578
1579       #if CMK_CELL
1580         if (progressCount <= 0) {
1581           /*OffloadAPIProgress();*/
1582           machine_OffloadAPIProgress();
1583           progressCount = CMK_CELL_PROGRESS_FREQ;
1584         }
1585         progressCount--;
1586       #endif
1587
1588       #ifdef CMK_CUDA
1589         if (cudaProgressCount == 0) {
1590           gpuProgressFn(); 
1591           cudaProgressCount = CMK_CUDA_PROGRESS_FREQ; 
1592         }
1593         cudaProgressCount--; 
1594       #endif
1595
1596     } else { /*No message available-- go (or remain) idle*/
1597       SCHEDULE_IDLE
1598
1599       #if CMK_CELL
1600         /*OffloadAPIProgress();*/
1601         machine_OffloadAPIProgress();
1602         progressCount = CMK_CELL_PROGRESS_FREQ;
1603       #endif
1604
1605       #ifdef CMK_CUDA
1606         gpuProgressFn(); 
1607         cudaProgressCount = CMK_CUDA_PROGRESS_FREQ;
1608       #endif
1609
1610     }
1611     CsdPeriodic();
1612   }
1613 }
1614 int CsdScheduleCount(int maxmsgs)
1615 {
1616   int isIdle=0;
1617   SCHEDULE_TOP
1618   while (1) {
1619     msg = CsdNextMessage(&state);
1620     if (msg!=NULL) { /*A message is available-- process it*/
1621       if (isIdle) {isIdle=0;CsdEndIdle();}
1622       maxmsgs--; 
1623       SCHEDULE_MESSAGE
1624       if (maxmsgs==0) break;
1625     } else { /*No message available-- go (or remain) idle*/
1626       SCHEDULE_IDLE
1627     }
1628     CsdPeriodic();
1629   }
1630   return maxmsgs;
1631 }
1632
1633 void CsdSchedulePoll(void)
1634 {
1635   SCHEDULE_TOP
1636   while (1)
1637   {
1638         CsdPeriodic();
1639         /*CmiMachineProgressImpl(); ??? */
1640         if (NULL!=(msg = CsdNextMessage(&state)))
1641         {
1642              SCHEDULE_MESSAGE 
1643         }
1644         else break;
1645   }
1646 }
1647
1648 void CmiDeliverSpecificMsg(handler)
1649 int handler;
1650 {
1651   int *msg; int side;
1652   void *localqueue = CpvAccess(CmiLocalQueue);
1653  
1654   side = 0;
1655   while (1) {
1656     CsdPeriodic();
1657     side ^= 1;
1658     if (side) msg = CmiGetNonLocal();
1659     else      msg = CdsFifo_Dequeue(localqueue);
1660     if (msg) {
1661       if (CmiGetHandler(msg)==handler) {
1662         CpvAccess(cQdState)->mProcessed++;
1663         CmiHandleMessage(msg);
1664         return;
1665       } else {
1666         CdsFifo_Enqueue(localqueue, msg);
1667       }
1668     }
1669   }
1670 }
1671  
1672 #endif /* CMK_CMIDELIVERS_USE_COMMON_CODE */
1673
1674 /***************************************************************************
1675  *
1676  * Standin Schedulers.
1677  *
1678  * We use the following strategy to make sure somebody's always running
1679  * the scheduler (CsdScheduler).  Initially, we assume the main thread
1680  * is responsible for this.  If the main thread blocks, we create a
1681  * "standin scheduler" thread to replace it.  If the standin scheduler
1682  * blocks, we create another standin scheduler to replace that one,
1683  * ad infinitum.  Collectively, the main thread and all the standin
1684  * schedulers are called "scheduling threads".
1685  *
1686  * Suppose the main thread is blocked waiting for data, and a standin
1687  * scheduler is running instead.  Suppose, then, that the data shows
1688  * up and the main thread is CthAwakened.  This causes a token to be
1689  * pushed into the queue.  When the standin pulls the token from the
1690  * queue and handles it, the standin goes to sleep, and control shifts
1691  * back to the main thread.  In this way, unnecessary standins are put
1692  * back to sleep.  These sleeping standins are stored on the
1693  * CthSleepingStandins list.
1694  *
1695  ***************************************************************************/
1696
1697 CpvStaticDeclare(CthThread, CthMainThread);
1698 CpvStaticDeclare(CthThread, CthSchedulingThread);
1699 CpvStaticDeclare(CthThread, CthSleepingStandins);
1700 CpvDeclare(int      , CthResumeNormalThreadIdx);
1701 CpvStaticDeclare(int      , CthResumeSchedulingThreadIdx);
1702
1703
1704 void CthStandinCode()
1705 {
1706   while (1) CsdScheduler(0);
1707 }
1708
1709 /* this fix the function pointer for thread migration and pup */
1710 static CthThread CthSuspendNormalThread()
1711 {
1712   return CpvAccess(CthSchedulingThread);
1713 }
1714
1715 void CthEnqueueSchedulingThread(CthThreadToken *token, int, int, unsigned int*);
1716 CthThread CthSuspendSchedulingThread();
1717
1718 CthThread CthSuspendSchedulingThread()
1719 {
1720   CthThread succ = CpvAccess(CthSleepingStandins);
1721
1722   if (succ) {
1723     CpvAccess(CthSleepingStandins) = CthGetNext(succ);
1724   } else {
1725     succ = CthCreate(CthStandinCode, 0, 256000);
1726     CthSetStrategy(succ,
1727                    CthEnqueueSchedulingThread,
1728                    CthSuspendSchedulingThread);
1729   }
1730   
1731   CpvAccess(CthSchedulingThread) = succ;
1732   return succ;
1733 }
1734
1735 /* Notice: For changes to the following function, make sure the function CthResumeNormalThreadDebug is also kept updated. */
1736 void CthResumeNormalThread(CthThreadToken* token)
1737 {
1738   CthThread t = token->thread;
1739
1740   /* BIGSIM_OOC DEBUGGING
1741   CmiPrintf("Resume normal thread with token[%p] ==> thread[%p]\n", token, t);
1742   */
1743
1744   if(t == NULL){
1745     free(token);
1746     return;
1747   }
1748 #if CMK_TRACE_ENABLED
1749 #if ! CMK_TRACE_IN_CHARM
1750   if(CpvAccess(traceOn))
1751     CthTraceResume(t);
1752 /*    if(CpvAccess(_traceCoreOn)) 
1753                 resumeTraceCore();*/
1754 #endif
1755 #endif
1756   
1757   /* BIGSIM_OOC DEBUGGING
1758   CmiPrintf("In CthResumeNormalThread:   ");
1759   CthPrintThdMagic(t);
1760   */
1761
1762   CthResume(t);
1763 }
1764
1765 void CthResumeSchedulingThread(CthThreadToken  *token)
1766 {
1767   CthThread t = token->thread;
1768   CthThread me = CthSelf();
1769   if (me == CpvAccess(CthMainThread)) {
1770     CthEnqueueSchedulingThread(CthGetToken(me),CQS_QUEUEING_FIFO, 0, 0);
1771   } else {
1772     CthSetNext(me, CpvAccess(CthSleepingStandins));
1773     CpvAccess(CthSleepingStandins) = me;
1774   }
1775   CpvAccess(CthSchedulingThread) = t;
1776 #if CMK_TRACE_ENABLED
1777 #if ! CMK_TRACE_IN_CHARM
1778   if(CpvAccess(traceOn))
1779     CthTraceResume(t);
1780 /*    if(CpvAccess(_traceCoreOn)) 
1781                 resumeTraceCore();*/
1782 #endif
1783 #endif
1784   CthResume(t);
1785 }
1786
1787 void CthEnqueueNormalThread(CthThreadToken* token, int s, 
1788                                    int pb,unsigned int *prio)
1789 {
1790   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
1791   CsdEnqueueGeneral(token, s, pb, prio);
1792 }
1793
1794 void CthEnqueueSchedulingThread(CthThreadToken* token, int s, 
1795                                        int pb,unsigned int *prio)
1796 {
1797   CmiSetHandler(token, CpvAccess(CthResumeSchedulingThreadIdx));
1798   CsdEnqueueGeneral(token, s, pb, prio);
1799 }
1800
1801 void CthSetStrategyDefault(CthThread t)
1802 {
1803   CthSetStrategy(t,
1804                  CthEnqueueNormalThread,
1805                  CthSuspendNormalThread);
1806 }
1807
1808 void CthSchedInit()
1809 {
1810   CpvInitialize(CthThread, CthMainThread);
1811   CpvInitialize(CthThread, CthSchedulingThread);
1812   CpvInitialize(CthThread, CthSleepingStandins);
1813   CpvInitialize(int      , CthResumeNormalThreadIdx);
1814   CpvInitialize(int      , CthResumeSchedulingThreadIdx);
1815
1816   CpvAccess(CthMainThread) = CthSelf();
1817   CpvAccess(CthSchedulingThread) = CthSelf();
1818   CpvAccess(CthSleepingStandins) = 0;
1819   CpvAccess(CthResumeNormalThreadIdx) =
1820     CmiRegisterHandler((CmiHandler)CthResumeNormalThread);
1821   CpvAccess(CthResumeSchedulingThreadIdx) =
1822     CmiRegisterHandler((CmiHandler)CthResumeSchedulingThread);
1823   CthSetStrategy(CthSelf(),
1824                  CthEnqueueSchedulingThread,
1825                  CthSuspendSchedulingThread);
1826 }
1827
1828 void CsdInit(argv)
1829   char **argv;
1830 {
1831   CpvInitialize(void *, CsdSchedQueue);
1832   CpvInitialize(int,   CsdStopFlag);
1833   CpvInitialize(int,   CsdLocalCounter);
1834   if(!CmiGetArgIntDesc(argv,"+csdLocalMax",&CsdLocalMax,"Set the max number of local messages to process before forcing a check for remote messages."))
1835     {
1836       CsdLocalMax= CSD_LOCAL_MAX_DEFAULT;
1837     }
1838   CpvAccess(CsdLocalCounter) = CsdLocalMax;
1839   CpvAccess(CsdSchedQueue) = (void *)CqsCreate();
1840
1841 #if CMK_OBJECT_QUEUE_AVAILABLE
1842   CpvInitialize(void *,CsdObjQueue);
1843   CpvAccess(CsdObjQueue) = CdsFifo_Create();
1844 #endif
1845
1846 #if CMK_NODE_QUEUE_AVAILABLE
1847   CsvInitialize(CmiLock, CsdNodeQueueLock);
1848   CsvInitialize(void *, CsdNodeQueue);
1849   if (CmiMyRank() ==0) {
1850         CsvAccess(CsdNodeQueueLock) = CmiCreateLock();
1851         CsvAccess(CsdNodeQueue) = (void *)CqsCreate();
1852   }
1853   CmiNodeAllBarrier();
1854 #endif
1855
1856 #if CMK_GRID_QUEUE_AVAILABLE
1857   CsvInitialize(void *, CsdGridQueue);
1858   CpvAccess(CsdGridQueue) = (void *)CqsCreate();
1859 #endif
1860
1861   CpvAccess(CsdStopFlag)  = 0;
1862 }
1863
1864
1865
1866 /** 
1867  *  @}
1868  */
1869
1870
1871 /*****************************************************************************
1872  *
1873  * Vector Send
1874  *
1875  * The last parameter "system" is by default at zero, in which case the normal
1876  * messages are sent. If it is set to 1, the CmiChunkHeader prepended to every
1877  * CmiAllocced message will also be sent (except for the first one). Useful for
1878  * AllToAll communication, and other system features. If system is 1, also all
1879  * the messages will be padded to 8 bytes. Thus, the caller must be aware of
1880  * that.
1881  *
1882  ****************************************************************************/
1883
1884 #if CMK_VECTOR_SEND_USES_COMMON_CODE
1885
1886 void CmiSyncVectorSend(int destPE, int n, int *sizes, char **msgs) {
1887   int total;
1888   char *mesg;
1889 #if CMK_USE_IBVERBS
1890   VECTOR_COMPACT(total, mesg, n, sizes, msgs,sizeof(infiCmiChunkHeader));
1891 #else
1892   VECTOR_COMPACT(total, mesg, n, sizes, msgs,sizeof(CmiChunkHeader));
1893 #endif  
1894   CmiSyncSendAndFree(destPE, total, mesg);
1895 }
1896
1897 CmiCommHandle CmiASyncVectorSend(int destPE, int n, int *sizes, char **msgs) {
1898   CmiSyncVectorSend(destPE, n, sizes, msgs);
1899   return NULL;
1900 }
1901
1902 void CmiSyncVectorSendAndFree(int destPE, int n, int *sizes, char **msgs) {
1903   int i;
1904   CmiSyncVectorSend(destPE, n, sizes, msgs);
1905   for(i=0;i<n;i++) CmiFree(msgs[i]);
1906   CmiFree(sizes);
1907   CmiFree(msgs);
1908 }
1909
1910 #endif
1911
1912 /*****************************************************************************
1913  *
1914  * Reduction management
1915  *
1916  * Only one reduction can be active at a single time in the program.
1917  * Moreover, since every call is supposed to pass in the same arguments,
1918  * having some static variables is not a problem for multithreading.
1919  * 
1920  * Except for "data" and "size", all the other parameters (which are all function
1921  * pointers) MUST be the same in every processor. Having different processors
1922  * pass in different function pointers results in an undefined behaviour.
1923  * 
1924  * The data passed in to CmiReduce and CmiNodeReduce is deleted by the system,
1925  * and MUST be allocated with CmiAlloc. The data passed in to the "Struct"
1926  * functions is deleted with the provided function, or it is left intact if no
1927  * function is specified.
1928  * 
1929  * The destination handler for the the first form MUST be embedded into the
1930  * message's header.
1931  * 
1932  * The pup function is used to pup the input data structure into a message to
1933  * be sent to the parent processor. This pup routine is currently used only
1934  * for sizing and packing, NOT unpacking. It MUST be non-null.
1935  * 
1936  * The merge function receives as first parameter the input "data", being it
1937  * a message or a complex data structure (it is up to the user to interpret it
1938  * correctly), and a list of incoming (packed) messages from the children.
1939  * The merge function is responsible to delete "data" if this is no longer needed.
1940  * The system will be in charge of deleting the messages passed in as the second
1941  * argument, and the return value of the function (using the provided deleteFn in
1942  * the second version, or CmiFree in the first). The merge function can return
1943  * data if the merge can be performed in-place. It MUST be non-null.
1944  * 
1945  * At the destination, on processor zero, the final data returned by the last
1946  * merge call will not be deleted by the system, and the CmiHandler function
1947  * will be in charge of its deletion.
1948  * 
1949  * CmiReduce/CmiReduceStruct MUST be called once by every processor,
1950  * CmiNodeReduce/CmiNodeReduceStruct MUST be called once by every node, and in
1951  * particular by the rank zero in each node.
1952  ****************************************************************************/
1953
1954 CpvStaticDeclare(int, CmiReductionMessageHandler);
1955 CpvStaticDeclare(int, CmiReductionDynamicRequestHandler);
1956
1957 CpvStaticDeclare(CmiReduction**, _reduce_info);
1958 CpvStaticDeclare(int, _reduce_info_size); /* This is the log2 of the size of the array */
1959 CpvStaticDeclare(CmiUInt2, _reduce_seqID_global); /* This is used only by global reductions */
1960 CpvStaticDeclare(CmiUInt2, _reduce_seqID_request);
1961 CpvStaticDeclare(CmiUInt2, _reduce_seqID_dynamic);
1962
1963 enum {
1964   CmiReductionID_globalOffset = 0, /* Reductions that involve the whole set of processors */
1965   CmiReductionID_requestOffset = 1, /* Reductions IDs that are requested by all the processors (i.e during intialization) */
1966   CmiReductionID_dynamicOffset = 2, /* Reductions IDs that are requested by only one processor (typically at runtime) */
1967   CmiReductionID_multiplier = 3
1968 };
1969
1970 CmiReduction* CmiGetReductionCreate(int id, short int numChildren) {
1971   int index = id & ~((~0)<<CpvAccess(_reduce_info_size));
1972   CmiReduction *red = CpvAccess(_reduce_info)[index];
1973   if (red != NULL && red->seqID != id) {
1974     /* The table needs to be expanded */
1975     CmiAbort("Too many simultaneous reductions");
1976   }
1977   if (red == NULL || red->numChildren < numChildren) {
1978     CmiReduction *newred;
1979     if (red != NULL) CmiPrintf("[%d] Reduction structure reallocated\n",CmiMyPe());
1980     CmiAssert(red == NULL || red->localContributed == 0);
1981     if (numChildren == 0) numChildren = 4;
1982     newred = (CmiReduction*)malloc(sizeof(CmiReduction)+numChildren*sizeof(void*));
1983     newred->numRemoteReceived = 0;
1984     newred->localContributed = 0;
1985     newred->seqID = id;
1986     if (red != NULL) {
1987       memcpy(newred, red, sizeof(CmiReduction)+red->numChildren*sizeof(void*));
1988       free(red);
1989     }
1990     red = newred;
1991     red->numChildren = numChildren;
1992     red->remoteData = (char**)(red+1);
1993     CpvAccess(_reduce_info)[index] = red;
1994   }
1995   return red;
1996 }
1997
1998 CmiReduction* CmiGetReduction(int id) {
1999   return CmiGetReductionCreate(id, 0);
2000 }
2001
2002 void CmiClearReduction(int id) {
2003   int index = id & ~((~0)<<CpvAccess(_reduce_info_size));
2004   free(CpvAccess(_reduce_info)[index]);
2005   CpvAccess(_reduce_info)[index] = NULL;
2006 }
2007
2008 CmiReduction* CmiGetNextReduction(short int numChildren) {
2009   int id = CpvAccess(_reduce_seqID_global);
2010   CpvAccess(_reduce_seqID_global) += CmiReductionID_multiplier;
2011   if (id > 0xFFF0) CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
2012   return CmiGetReductionCreate(id, numChildren);
2013 }
2014
2015 CmiReductionID CmiGetGlobalReduction() {
2016   return CpvAccess(_reduce_seqID_request)+=CmiReductionID_multiplier;
2017 }
2018
2019 CmiReductionID CmiGetDynamicReduction() {
2020   if (CmiMyPe() != 0) CmiAbort("Cannot call CmiGetDynamicReduction on processors other than zero!\n");
2021   return CpvAccess(_reduce_seqID_dynamic)+=CmiReductionID_multiplier;
2022 }
2023
2024 void CmiReductionHandleDynamicRequest(char *msg) {
2025   int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
2026   int pe = values[0];
2027   int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+values[1];
2028   values[0] = CmiGetDynamicReduction();
2029   CmiSetHandler(msg, CmiGetXHandler(msg));
2030   if (pe >= 0) {
2031     CmiSyncSendAndFree(pe, size, msg);
2032   } else {
2033     CmiSyncBroadcastAllAndFree(size, msg);
2034   }
2035 }
2036
2037 void CmiGetDynamicReductionRemote(int handlerIdx, int pe, int dataSize, void *data) {
2038   int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+dataSize;
2039   char *msg = (char*)CmiAlloc(size);
2040   int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
2041   values[0] = pe;
2042   values[1] = dataSize;
2043   CmiSetXHandler(msg, handlerIdx);
2044   if (dataSize) memcpy(msg+CmiMsgHeaderSizeBytes+2*sizeof(int), data, dataSize);
2045   if (CmiMyPe() == 0) {
2046     CmiReductionHandleDynamicRequest(msg);
2047   } else {
2048     /* send the request to processor 0 */
2049     CmiSetHandler(msg, CpvAccess(CmiReductionDynamicRequestHandler));
2050     CmiSyncSendAndFree(0, size, msg);
2051   }
2052 }
2053
2054 void CmiSendReduce(CmiReduction *red) {
2055   void *mergedData, *msg;
2056   int msg_size;
2057   if (!red->localContributed || red->numChildren != red->numRemoteReceived) return;
2058   mergedData = red->localData;
2059   msg_size = red->localSize;
2060   if (red->numChildren > 0) {
2061     int i, offset=0;
2062     if (red->ops.pupFn != NULL) {
2063       offset = CmiReservedHeaderSize;
2064       for (i=0; i<red->numChildren; ++i) red->remoteData[i] += offset;
2065     }
2066     mergedData = (red->ops.mergeFn)(&msg_size, red->localData, (void **)red->remoteData, red->numChildren);
2067     for (i=0; i<red->numChildren; ++i) CmiFree(red->remoteData[i] - offset);
2068   }
2069   /*CpvAccess(_reduce_num_children) = 0;*/
2070   /*CpvAccess(_reduce_received) = 0;*/
2071   msg = mergedData;
2072   if (red->parent != -1) {
2073     if (red->ops.pupFn != NULL) {
2074       pup_er p = pup_new_sizer();
2075       (red->ops.pupFn)(p, mergedData);
2076       msg_size = pup_size(p) + CmiReservedHeaderSize;
2077       pup_destroy(p);
2078       msg = CmiAlloc(msg_size);
2079       p = pup_new_toMem((void*)(((char*)msg)+CmiReservedHeaderSize));
2080       (red->ops.pupFn)(p, mergedData);
2081       pup_destroy(p);
2082       if (red->ops.deleteFn != NULL) (red->ops.deleteFn)(red->localData);
2083     }
2084     CmiSetHandler(msg, CpvAccess(CmiReductionMessageHandler));
2085     CmiSetRedID(msg, red->seqID);
2086     /*CmiPrintf("CmiSendReduce(%d): sending %d bytes to %d\n",CmiMyPe(),msg_size,red->parent);*/
2087     CmiSyncSendAndFree(red->parent, msg_size, msg);
2088   } else {
2089     (red->ops.destination)(msg);
2090   }
2091   CmiClearReduction(red->seqID);
2092 }
2093
2094 void *CmiReduceMergeFn_random(int *size, void *data, void** remote, int n) {
2095   return data;
2096 }
2097
2098 static void CmiGlobalReduce(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReduction *red) {
2099   CmiAssert(red->localContributed == 0);
2100   red->localContributed = 1;
2101   red->localData = msg;
2102   red->localSize = size;
2103   red->numChildren = CmiNumSpanTreeChildren(CmiMyPe());
2104   red->parent = CmiSpanTreeParent(CmiMyPe());
2105   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
2106   red->ops.mergeFn = mergeFn;
2107   red->ops.pupFn = NULL;
2108   /*CmiPrintf("[%d] CmiReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2109   CmiSendReduce(red);
2110 }
2111
2112 static void CmiGlobalReduceStruct(void *data, CmiReducePupFn pupFn,
2113                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2114                      CmiReduceDeleteFn deleteFn, CmiReduction *red) {
2115   CmiAssert(red->localContributed == 0);
2116   red->localContributed = 1;
2117   red->localData = data;
2118   red->localSize = 0;
2119   red->numChildren = CmiNumSpanTreeChildren(CmiMyPe());
2120   red->parent = CmiSpanTreeParent(CmiMyPe());
2121   red->ops.destination = dest;
2122   red->ops.mergeFn = mergeFn;
2123   red->ops.pupFn = pupFn;
2124   red->ops.deleteFn = deleteFn;
2125   /*CmiPrintf("[%d] CmiReduceStruct::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2126   CmiSendReduce(red);
2127 }
2128
2129 void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2130   CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
2131   CmiGlobalReduce(msg, size, mergeFn, red);
2132 }
2133
2134 void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
2135                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2136                      CmiReduceDeleteFn deleteFn) {
2137   CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
2138   CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
2139 }
2140
2141 void CmiReduceID(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2142   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2143   CmiGlobalReduce(msg, size, mergeFn, red);
2144 }
2145
2146 void CmiReduceStructID(void *data, CmiReducePupFn pupFn,
2147                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2148                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2149   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2150   CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
2151 }
2152
2153 void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2154   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2155   int myPos;
2156   CmiAssert(red->localContributed == 0);
2157   red->localContributed = 1;
2158   red->localData = msg;
2159   red->localSize = size;
2160   for (myPos=0; myPos<npes; ++myPos) {
2161     if (pes[myPos] == CmiMyPe()) break;
2162   }
2163   CmiAssert(myPos < npes);
2164   red->numChildren = npes - (myPos << 2) - 1;
2165   if (red->numChildren > 4) red->numChildren = 4;
2166   if (red->numChildren < 0) red->numChildren = 0;
2167   if (myPos == 0) red->parent = -1;
2168   else red->parent = pes[(myPos - 1) >> 2];
2169   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
2170   red->ops.mergeFn = mergeFn;
2171   red->ops.pupFn = NULL;
2172   /*CmiPrintf("[%d] CmiListReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2173   CmiSendReduce(red);
2174 }
2175
2176 void CmiListReduceStruct(int npes, int *pes,
2177                      void *data, CmiReducePupFn pupFn,
2178                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2179                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2180   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2181   int myPos;
2182   CmiAssert(red->localContributed == 0);
2183   red->localContributed = 1;
2184   red->localData = data;
2185   red->localSize = 0;
2186   for (myPos=0; myPos<npes; ++myPos) {
2187     if (pes[myPos] == CmiMyPe()) break;
2188   }
2189   CmiAssert(myPos < npes);
2190   red->numChildren = npes - (myPos << 2) - 1;
2191   if (red->numChildren > 4) red->numChildren = 4;
2192   if (red->numChildren < 0) red->numChildren = 0;
2193   red->parent = (myPos - 1) >> 2;
2194   if (myPos == 0) red->parent = -1;
2195   red->ops.destination = dest;
2196   red->ops.mergeFn = mergeFn;
2197   red->ops.pupFn = pupFn;
2198   red->ops.deleteFn = deleteFn;
2199   CmiSendReduce(red);
2200 }
2201
2202 void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2203   int npes, *pes;
2204   CmiLookupGroup(grp, &npes, &pes);
2205   CmiListReduce(npes, pes, msg, size, mergeFn, id);
2206 }
2207
2208 void CmiGroupReduceStruct(CmiGroup grp, void *data, CmiReducePupFn pupFn,
2209                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2210                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2211   int npes, *pes;
2212   CmiLookupGroup(grp, &npes, &pes);
2213   CmiListReduceStruct(npes, pes, data, pupFn, mergeFn, dest, deleteFn, id);
2214 }
2215
2216 void CmiNodeReduce(void *data, int size, CmiReduceMergeFn mergeFn, int redID, int numChildren, int parent) {
2217   CmiAbort("Feel free to implement CmiNodeReduce...");
2218   /*
2219   CmiAssert(CmiRankOf(CmiMyPe()) == 0);
2220   CpvAccess(_reduce_data) = data;
2221   CpvAccess(_reduce_data_size) = size;
2222   CpvAccess(_reduce_parent) = CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode()));
2223   _reduce_destination = (CmiHandler)CmiGetHandlerFunction(data);
2224   _reduce_pupFn = NULL;
2225   _reduce_mergeFn = mergeFn;
2226   CpvAccess(_reduce_num_children) = CmiNumNodeSpanTreeChildren(CmiMyNode());
2227   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(size);
2228   */
2229 }
2230 #if 0
2231 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int), int redID) {
2232   CmiNodeReduce(data, size, mergeFn, redID, CmiNumNodeSpanTreeChildren(CmiMyNode()),
2233       CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode())));
2234 }
2235 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int), int numChildren, int parent) {
2236   CmiNodeReduce(data, size, mergeFn, CmiReduceNextID(), numChildren, parent);
2237 }
2238 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int)) {
2239   CmiNodeReduce(data, size, mergeFn, CmiReduceNextID(), CmiNumNodeSpanTreeChildren(CmiMyNode()),
2240       CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode())));
2241 }
2242 #endif
2243
2244 void CmiNodeReduceStruct(void *data, CmiReducePupFn pupFn,
2245                          CmiReduceMergeFn mergeFn, CmiHandler dest,
2246                          CmiReduceDeleteFn deleteFn) {
2247   CmiAbort("Feel free to implement CmiNodeReduceStruct...");
2248 /*
2249   CmiAssert(CmiRankOf(CmiMyPe()) == 0);
2250   CpvAccess(_reduce_data) = data;
2251   CpvAccess(_reduce_parent) = CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode()));
2252   _reduce_destination = dest;
2253   _reduce_pupFn = pupFn;
2254   _reduce_mergeFn = mergeFn;
2255   _reduce_deleteFn = deleteFn;
2256   CpvAccess(_reduce_num_children) = CmiNumNodeSpanTreeChildren(CmiMyNode());
2257   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(0);
2258   */
2259 }
2260
2261 void CmiHandleReductionMessage(void *msg) {
2262   CmiReduction *red = CmiGetReduction(CmiGetRedID(msg));
2263   if (red->numRemoteReceived == red->numChildren) red = CmiGetReductionCreate(CmiGetRedID(msg), red->numChildren+4);
2264   red->remoteData[red->numRemoteReceived++] = msg;
2265   /*CmiPrintf("[%d] CmiReduce::remote %hd\n",CmiMyPe(),red->seqID);*/
2266   CmiSendReduce(red);
2267 /*
2268   CpvAccess(_reduce_msg_list)[CpvAccess(_reduce_received)++] = msg;
2269   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce();
2270   / *else CmiPrintf("CmiHandleReductionMessage(%d): %d - %d\n",CmiMyPe(),CpvAccess(_reduce_received),CpvAccess(_reduce_num_children));*/
2271 }
2272
2273 void CmiReductionsInit() {
2274   int i;
2275   CpvInitialize(int, CmiReductionMessageHandler);
2276   CpvAccess(CmiReductionMessageHandler) = CmiRegisterHandler((CmiHandler)CmiHandleReductionMessage);
2277   CpvInitialize(int, CmiReductionDynamicRequestHandler);
2278   CpvAccess(CmiReductionDynamicRequestHandler) = CmiRegisterHandler((CmiHandler)CmiReductionHandleDynamicRequest);
2279   CpvInitialize(CmiUInt2, _reduce_seqID_global);
2280   CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
2281   CpvInitialize(CmiUInt2, _reduce_seqID_request);
2282   CpvAccess(_reduce_seqID_request) = CmiReductionID_requestOffset;
2283   CpvInitialize(CmiUInt2, _reduce_seqID_dynamic);
2284   CpvAccess(_reduce_seqID_dynamic) = CmiReductionID_dynamicOffset;
2285   CpvInitialize(int, _reduce_info_size);
2286   CpvAccess(_reduce_info_size) = 4;
2287   CpvInitialize(CmiReduction**, _reduce_info);
2288   CpvAccess(_reduce_info) = malloc(16*sizeof(CmiReduction*));
2289   for (i=0; i<16; ++i) CpvAccess(_reduce_info)[i] = NULL;
2290 }
2291
2292 /*****************************************************************************
2293  *
2294  * Multicast groups
2295  *
2296  ****************************************************************************/
2297
2298 #if CMK_MULTICAST_DEF_USE_COMMON_CODE
2299
2300 typedef struct GroupDef
2301 {
2302   union {
2303     char core[CmiMsgHeaderSizeBytes];
2304     struct GroupDef *next;
2305   } core;
2306   CmiGroup group;
2307   int npes;
2308   int pes[1];
2309 }
2310 *GroupDef;
2311
2312 #define GROUPTAB_SIZE 101
2313
2314 CpvStaticDeclare(int, CmiGroupHandlerIndex);
2315 CpvStaticDeclare(int, CmiGroupCounter);
2316 CpvStaticDeclare(GroupDef *, CmiGroupTable);
2317
2318 void CmiGroupHandler(GroupDef def)
2319 {
2320   /* receive group definition, insert into group table */
2321   GroupDef *table = CpvAccess(CmiGroupTable);
2322   unsigned int hashval, bucket;
2323   hashval = (def->group.id ^ def->group.pe);
2324   bucket = hashval % GROUPTAB_SIZE;
2325   def->core.next = table[bucket];
2326   table[bucket] = def;
2327 }
2328
2329 CmiGroup CmiEstablishGroup(int npes, int *pes)
2330 {
2331   /* build new group definition, broadcast it */
2332   CmiGroup grp; GroupDef def; int len, i;
2333   grp.id = CpvAccess(CmiGroupCounter)++;
2334   grp.pe = CmiMyPe();
2335   len = sizeof(struct GroupDef)+(npes*sizeof(int));
2336   def = (GroupDef)CmiAlloc(len);
2337   def->group = grp;
2338   def->npes = npes;
2339   for (i=0; i<npes; i++)
2340     def->pes[i] = pes[i];
2341   CmiSetHandler(def, CpvAccess(CmiGroupHandlerIndex));
2342   CmiSyncBroadcastAllAndFree(len, def);
2343   return grp;
2344 }
2345
2346 void CmiLookupGroup(CmiGroup grp, int *npes, int **pes)
2347 {
2348   unsigned int hashval, bucket;  GroupDef def;
2349   GroupDef *table = CpvAccess(CmiGroupTable);
2350   hashval = (grp.id ^ grp.pe);
2351   bucket = hashval % GROUPTAB_SIZE;
2352   for (def=table[bucket]; def; def=def->core.next) {
2353     if ((def->group.id == grp.id)&&(def->group.pe == grp.pe)) {
2354       *npes = def->npes;
2355       *pes = def->pes;
2356       return;
2357     }
2358   }
2359   *npes = 0; *pes = 0;
2360 }
2361
2362 void CmiGroupInit()
2363 {
2364   CpvInitialize(int, CmiGroupHandlerIndex);
2365   CpvInitialize(int, CmiGroupCounter);
2366   CpvInitialize(GroupDef *, CmiGroupTable);
2367   CpvAccess(CmiGroupHandlerIndex) = CmiRegisterHandler((CmiHandler)CmiGroupHandler);
2368   CpvAccess(CmiGroupCounter) = 0;
2369   CpvAccess(CmiGroupTable) =
2370     (GroupDef*)calloc(GROUPTAB_SIZE, sizeof(GroupDef));
2371   if (CpvAccess(CmiGroupTable) == 0)
2372     CmiAbort("Memory Allocation Error");
2373 }
2374
2375 #endif
2376
2377 /*****************************************************************************
2378  *
2379  * Common List-Cast and Multicast Code
2380  *
2381  ****************************************************************************/
2382
2383 #if CMK_MULTICAST_LIST_USE_COMMON_CODE
2384
2385 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2386 {
2387   int i;
2388   for(i=0;i<npes;i++) {
2389     CmiSyncSend(pes[i], len, msg);
2390   }
2391 }
2392
2393 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2394 {
2395   /* A better asynchronous implementation may be wanted, but at least it works */
2396   CmiSyncListSendFn(npes, pes, len, msg);
2397   return (CmiCommHandle) 0;
2398 }
2399
2400 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2401 {
2402   int i;
2403   for(i=0;i<npes-1;i++) {
2404     CmiSyncSend(pes[i], len, msg);
2405   }
2406   if (npes>0)
2407     CmiSyncSendAndFree(pes[npes-1], len, msg);
2408   else 
2409     CmiFree(msg);
2410 }
2411
2412 #endif
2413
2414 #if CMK_MULTICAST_GROUP_USE_COMMON_CODE
2415
2416 typedef struct MultiMsg
2417 {
2418   char core[CmiMsgHeaderSizeBytes];
2419   CmiGroup group;
2420   int pos;
2421   int origlen;
2422 }
2423 *MultiMsg;
2424
2425
2426 CpvDeclare(int, CmiMulticastHandlerIndex);
2427
2428 void CmiMulticastDeliver(MultiMsg msg)
2429 {
2430   int npes, *pes; int olen, nlen, pos, child1, child2;
2431   olen = msg->origlen;
2432   nlen = olen + sizeof(struct MultiMsg);
2433   CmiLookupGroup(msg->group, &npes, &pes);
2434   if (pes==0) {
2435     CmiSyncSendAndFree(CmiMyPe(), nlen, msg);
2436     return;
2437   }
2438   if (npes==0) {
2439     CmiFree(msg);
2440     return;
2441   }
2442   if (msg->pos == -1) {
2443     msg->pos=0;
2444     CmiSyncSendAndFree(pes[0], nlen, msg);
2445     return;
2446   }
2447   pos = msg->pos;
2448   child1 = ((pos+1)<<1);
2449   child2 = child1-1;
2450   if (child1 < npes) {
2451     msg->pos = child1;
2452     CmiSyncSend(pes[child1], nlen, msg);
2453   }
2454   if (child2 < npes) {
2455     msg->pos = child2;
2456     CmiSyncSend(pes[child2], nlen, msg);
2457   }
2458   if(olen < sizeof(struct MultiMsg)) {
2459     memcpy(msg, msg+1, olen);
2460   } else {
2461     memcpy(msg, (((char*)msg)+olen), sizeof(struct MultiMsg));
2462   }
2463   CmiSyncSendAndFree(CmiMyPe(), olen, msg);
2464 }
2465
2466 void CmiMulticastHandler(MultiMsg msg)
2467 {
2468   CmiMulticastDeliver(msg);
2469 }
2470
2471 void CmiSyncMulticastFn(CmiGroup grp, int len, char *msg)
2472 {
2473   int newlen; MultiMsg newmsg;
2474   newlen = len + sizeof(struct MultiMsg);
2475   newmsg = (MultiMsg)CmiAlloc(newlen);
2476   if(len < sizeof(struct MultiMsg)) {
2477     memcpy(newmsg+1, msg, len);
2478   } else {
2479     memcpy(newmsg+1, msg+sizeof(struct MultiMsg), len-sizeof(struct MultiMsg));
2480     memcpy(((char *)newmsg+len), msg, sizeof(struct MultiMsg));
2481   }
2482   newmsg->group = grp;
2483   newmsg->origlen = len;
2484   newmsg->pos = -1;
2485   CmiSetHandler(newmsg, CpvAccess(CmiMulticastHandlerIndex));
2486   CmiMulticastDeliver(newmsg);
2487 }
2488
2489 void CmiFreeMulticastFn(CmiGroup grp, int len, char *msg)
2490 {
2491   CmiSyncMulticastFn(grp, len, msg);
2492   CmiFree(msg);
2493 }
2494
2495 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int len, char *msg)
2496 {
2497   CmiError("Async Multicast not implemented.");
2498   return (CmiCommHandle) 0;
2499 }
2500
2501 void CmiMulticastInit()
2502 {
2503   CpvInitialize(int, CmiMulticastHandlerIndex);
2504   CpvAccess(CmiMulticastHandlerIndex) =
2505     CmiRegisterHandler((CmiHandler)CmiMulticastHandler);
2506 }
2507
2508 #endif
2509
2510 #if CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2511 extern void *arena_malloc(int size);
2512 extern void arena_free(void *blockPtr);
2513 #endif
2514
2515 /***************************************************************************
2516  *
2517  * Memory Allocation routines 
2518  *
2519  * A block of memory can consist of multiple chunks.  Each chunk has
2520  * a sizefield and a refcount.  The first chunk's refcount is a reference
2521  * count.  That's how many CmiFrees it takes to free the message.
2522  * Subsequent chunks have a refcount which is less than zero.  This is
2523  * the offset back to the start of the first chunk.
2524  *
2525  * Each chunk has a CmiChunkHeader before the user data, with the fields:
2526  *
2527  *  size: The user-allocated size of the chunk, in bytes.
2528  *
2529  *  ref: A magic reference count object. Ordinary blocks start with
2530  *     reference count 1.  When the reference count reaches zero,
2531  *     the block is deleted.  To support nested buffers, the 
2532  *     reference count can also be negative, which means it is 
2533  *     a byte offset to the enclosing buffer's reference count.
2534  *
2535  ***************************************************************************/
2536
2537
2538 void *CmiAlloc(int size)
2539 {
2540   char *res;
2541
2542 #if CONVERSE_VERSION_ELAN
2543   res = (char *) elan_CmiAlloc(size+sizeof(CmiChunkHeader));
2544 #elif CONVERSE_VERSION_VMI
2545   res = (char *) CMI_VMI_CmiAlloc(size+sizeof(CmiChunkHeader));
2546 #elif CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2547   res = (char*) arena_malloc(size+sizeof(CmiChunkHeader));
2548 #elif CMK_USE_IBVERBS | CMK_USE_IBUD
2549   res = (char *) infi_CmiAlloc(size+sizeof(CmiChunkHeader));
2550 #elif CONVERSE_POOL
2551   res =(char *) CmiPoolAlloc(size+sizeof(CmiChunkHeader));
2552 #else
2553   res =(char *) malloc_nomigrate(size+sizeof(CmiChunkHeader));
2554 #endif
2555
2556   _MEMCHECK(res);
2557
2558 #ifdef MEMMONITOR
2559   CpvAccess(MemoryUsage) += size+sizeof(CmiChunkHeader);
2560   CpvAccess(AllocCount)++;
2561   CpvAccess(BlocksAllocated)++;
2562   if (CpvAccess(MemoryUsage) > CpvAccess(HiWaterMark)) {
2563     CpvAccess(HiWaterMark) = CpvAccess(MemoryUsage);
2564   }
2565   if (CpvAccess(MemoryUsage) > 1.1 * CpvAccess(ReportedHiWaterMark)) {
2566     CmiPrintf("HIMEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2567             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2568             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2569     CpvAccess(ReportedHiWaterMark) = CpvAccess(MemoryUsage);
2570   }
2571   if ((CpvAccess(AllocCount) % 1000) == 0) {
2572     CmiPrintf("MEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2573             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2574             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2575   }
2576 #endif
2577
2578   res+=sizeof(CmiChunkHeader);
2579   SIZEFIELD(res)=size;
2580   REFFIELD(res)=1;
2581   return (void *)res;
2582 }
2583
2584 /** Follow the header links out to the most enclosing block */
2585 static void *CmiAllocFindEnclosing(void *blk) {
2586   int refCount = REFFIELD(blk);
2587   while (refCount < 0) {
2588     blk = (void *)((char*)blk+refCount); /* Jump to enclosing block */
2589     refCount = REFFIELD(blk);
2590   }
2591   return blk;
2592 }
2593
2594 /** Increment the reference count for this block's owner.
2595     This call must be matched by an equivalent CmiFree. */
2596 void CmiReference(void *blk)
2597 {
2598   REFFIELD(CmiAllocFindEnclosing(blk))++;
2599 }
2600
2601 /** Return the size of the user portion of this block. */
2602 int CmiSize(void *blk)
2603 {
2604   return SIZEFIELD(blk);
2605 }
2606
2607 /** Decrement the reference count for this block. */
2608 void CmiFree(void *blk)
2609 {
2610   void *parentBlk=CmiAllocFindEnclosing(blk);
2611   int refCount=REFFIELD(parentBlk);
2612 #if CMK_ERROR_CHECKING
2613   if(refCount==0) /* Logic error: reference count shouldn't already have been zero */
2614     CmiAbort("CmiFree reference count was zero-- is this a duplicate free?");
2615 #endif
2616   refCount--;
2617   REFFIELD(parentBlk) = refCount;
2618   if(refCount==0) { /* This was the last reference to the block-- free it */
2619 #ifdef MEMMONITOR
2620     int size=SIZEFIELD(parentBlk);
2621     if (size > 1000000000) /* Absurdly large size field-- warning */
2622       CmiPrintf("MEMSTAT Uh-oh -- SIZEFIELD=%d\n",size);
2623     CpvAccess(MemoryUsage) -= (size + sizeof(CmiChunkHeader));
2624     CpvAccess(BlocksAllocated)--;
2625 #endif
2626
2627 #if CONVERSE_VERSION_ELAN
2628     elan_CmiFree(BLKSTART(parentBlk));
2629 #elif CONVERSE_VERSION_VMI
2630     CMI_VMI_CmiFree(BLKSTART(parentBlk));
2631 #elif CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2632     arena_free(BLKSTART(parentBlk));
2633 #elif CMK_USE_IBVERBS | CMK_USE_IBUD
2634     /* is this message the head of a MultipleSend that we received?
2635        Then the parts with INFIMULTIPOOL have metadata which must be 
2636        unregistered and freed.  */
2637 #ifdef CMK_IBVERS_CLEAN_MULTIPLESEND
2638     if(CmiGetHandler(parentBlk)==CpvAccess(CmiMainHandlerIDP))
2639       {
2640         infi_freeMultipleSend(parentBlk);
2641       }
2642 #endif
2643     infi_CmiFree(BLKSTART(parentBlk));
2644 #elif CONVERSE_POOL
2645     CmiPoolFree(BLKSTART(parentBlk));
2646 #else
2647     free_nomigrate(BLKSTART(parentBlk));
2648 #endif
2649   }
2650 }
2651
2652
2653 /***************************************************************************
2654  *
2655  * Temporary-memory Allocation routines 
2656  *
2657  *  This buffer augments the storage available on the regular machine stack
2658  * for fairly large temporary buffers, which allows us to use smaller machine
2659  * stacks.
2660  *
2661  ***************************************************************************/
2662
2663 #define CMI_TMP_BUF_MAX 128*1024 /* Allow this much temporary storage. */
2664
2665 typedef struct {
2666   char *buf; /* Start of temporary buffer */
2667   int cur; /* First unused location in temporary buffer */
2668   int max; /* Length of temporary buffer */
2669 } CmiTmpBuf_t;
2670 CpvDeclare(CmiTmpBuf_t,CmiTmpBuf); /* One temporary buffer per PE */
2671
2672 static void CmiTmpSetup(CmiTmpBuf_t *b) {
2673   b->buf=malloc(CMI_TMP_BUF_MAX);
2674   b->cur=0;
2675   b->max=CMI_TMP_BUF_MAX;
2676 }
2677
2678 void *CmiTmpAlloc(int size) {
2679   if (!CpvInitialized(CmiTmpBuf)) {
2680     return malloc(size);
2681   }
2682   else { /* regular case */
2683     CmiTmpBuf_t *b=&CpvAccess(CmiTmpBuf);
2684     void *t;
2685     if (b->cur+size>b->max) {
2686       if (b->max==0) /* We're just uninitialized */
2687         CmiTmpSetup(b);
2688       else /* We're really out of space! */
2689         CmiAbort("CmiTmpAlloc: asked for too much temporary buffer space");
2690     }
2691     t=b->buf+b->cur;
2692     b->cur+=size;
2693     return t;
2694   }
2695 }
2696 void CmiTmpFree(void *t) {
2697   if (!CpvInitialized(CmiTmpBuf)) {
2698     free(t);
2699   }
2700   else { /* regular case */
2701     CmiTmpBuf_t *b=&CpvAccess(CmiTmpBuf);
2702     /* t should point into our temporary buffer: figure out where */
2703     int cur=((const char *)t)-b->buf;
2704 #if CMK_ERROR_CHECKING
2705     if (cur<0 || cur>b->max)
2706       CmiAbort("CmiTmpFree: called with an invalid pointer");
2707 #endif
2708     b->cur=cur;
2709   }
2710 }
2711
2712 void CmiTmpInit(char **argv) {
2713   CpvInitialize(CmiTmpBuf_t,CmiTmpBuf);
2714   /* Set up this processor's temporary buffer */
2715   CmiTmpSetup(&CpvAccess(CmiTmpBuf));
2716 }
2717
2718 /******************************************************************************
2719
2720   Cross-platform directory creation
2721
2722   ****************************************************************************/
2723 #ifdef _MSC_VER
2724 /* Windows directory creation: */
2725 #include <windows.h>
2726
2727 void CmiMkdir(const char *dirName) {
2728         CreateDirectory(dirName,NULL);
2729 }
2730
2731 #else /* !_MSC_VER */
2732 /* UNIX directory creation */
2733 #include <unistd.h> 
2734 #include <sys/stat.h> /* from "mkdir" man page */
2735 #include <sys/types.h>
2736
2737 void CmiMkdir(const char *dirName) {
2738 #ifndef __MINGW_H
2739         mkdir(dirName,0777);
2740 #else
2741         mkdir(dirName);
2742 #endif
2743 }
2744
2745 #endif
2746
2747
2748 /******************************************************************************
2749
2750   Multiple Send function                               
2751
2752   ****************************************************************************/
2753
2754
2755
2756
2757
2758 /****************************************************************************
2759 * DESCRIPTION : This function call allows the user to send multiple messages
2760 *               from one processor to another, all intended for differnet 
2761 *               handlers.
2762 *
2763 *               Parameters :
2764 *
2765 *               destPE, len, int sizes[0..len-1], char *messages[0..len-1]
2766 *
2767 ****************************************************************************/
2768 /* Round up message size to the message granularity. 
2769    Does this by adding, then truncating.
2770 */
2771 static int roundUpSize(unsigned int s) {
2772   return (int)((s+sizeof(double)-1)&~(sizeof(double)-1));
2773 }
2774 /* Return the amount of message padding required for a message
2775    with this many user bytes. 
2776  */
2777 static int paddingSize(unsigned int s) {
2778   return roundUpSize(s)-s;
2779 }
2780
2781 /* Message header for a bundle of multiple-sent messages */
2782 typedef struct {
2783   char convHeader[CmiMsgHeaderSizeBytes];
2784   int nMessages; /* Number of distinct messages bundled below. */
2785   double pad; /* To align the first message, which follows this header */
2786 } CmiMultipleSendHeader;
2787
2788 #if CMK_USE_IBVERBS | CMK_USE_IBUD
2789 /* given a pointer to a multisend message clean up the metadata */
2790
2791 void infi_freeMultipleSend(void *msgWhole)
2792 {
2793   int len=((CmiMultipleSendHeader *)msgWhole)->nMessages;
2794   double pad=((CmiMultipleSendHeader *)msgWhole)->pad;
2795   int offset=sizeof(CmiMultipleSendHeader);
2796   int m;
2797   void *thisMsg=NULL;
2798   if (pad != 1234567.89) return;
2799   for(m=0;m<len;m++)
2800     {
2801       /*unreg meta, free meta, move the ptr */
2802       /* note these weird little things are not pooled */
2803       /* do NOT free the message here, we are only a part of this buffer*/
2804       infiCmiChunkHeader *ch=(infiCmiChunkHeader *)(msgWhole+offset);
2805       char *msg=(msgWhole+offset+sizeof(infiCmiChunkHeader));
2806       int msgSize=ch->chunkHeader.size; /* Size of user portion of message (plus padding at end) */
2807       infi_unregAndFreeMeta(ch->metaData);
2808       offset+= sizeof(infiCmiChunkHeader) + msgSize;
2809     }
2810 }
2811 #endif
2812
2813
2814 static void _CmiMultipleSend(unsigned int destPE, int len, int sizes[], char *msgComps[], int immed)
2815 {
2816   CmiMultipleSendHeader header;
2817   int m; /* Outgoing message */
2818
2819 #if CMK_USE_IBVERBS
2820   infiCmiChunkHeader *msgHdr;
2821 #else
2822   CmiChunkHeader *msgHdr; /* Chunk headers for each message */
2823 #endif
2824         
2825   double pad = 0; /* padding required */
2826   int vecLen; /* Number of pieces in outgoing message vector */
2827   int *vecSizes; /* Sizes of each piece we're sending out. */
2828   char **vecPtrs; /* Pointers to each piece we're sending out. */
2829   int vec; /* Entry we're currently filling out in above array */
2830         
2831 #if CMK_USE_IBVERBS
2832   msgHdr = (infiCmiChunkHeader *)CmiTmpAlloc(len * sizeof(infiCmiChunkHeader));
2833 #else
2834   msgHdr = (CmiChunkHeader *)CmiTmpAlloc(len * sizeof(CmiChunkHeader));
2835 #endif
2836         
2837   /* Allocate memory for the outgoing vector*/
2838   vecLen=1+3*len; /* Header and 3 parts per message */
2839   vecSizes = (int *)CmiTmpAlloc(vecLen * sizeof(int));
2840   vecPtrs = (char **)CmiTmpAlloc(vecLen * sizeof(char *));
2841   vec=0;
2842   
2843   /* Build the header */
2844   header.nMessages=len;
2845   CmiSetHandler(&header, CpvAccess(CmiMainHandlerIDP));
2846   header.pad = 1234567.89;
2847 #if CMK_IMMEDIATE_MSG
2848   if (immed) CmiBecomeImmediate(&header);
2849 #endif
2850   vecSizes[vec]=sizeof(header); vecPtrs[vec]=(char *)&header;
2851   vec++;
2852
2853   /* Build an entry for each message: 
2854          | CmiChunkHeader | Message data | Message padding | ...next message entry ...
2855   */
2856   for (m=0;m<len;m++) {
2857 #if CMK_USE_IBVERBS
2858     msgHdr[m].chunkHeader.size=roundUpSize(sizes[m]); /* Size of message and padding */
2859     msgHdr[m].chunkHeader.ref=0; /* Reference count will be filled out on receive side */
2860     msgHdr[m].metaData=NULL;
2861 #else
2862     msgHdr[m].size=roundUpSize(sizes[m]); /* Size of message and padding */
2863     msgHdr[m].ref=0; /* Reference count will be filled out on receive side */
2864 #endif          
2865     
2866     /* First send the message's CmiChunkHeader (for use on receive side) */
2867 #if CMK_USE_IBVERBS
2868     vecSizes[vec]=sizeof(infiCmiChunkHeader);
2869 #else
2870     vecSizes[vec]=sizeof(CmiChunkHeader); 
2871 #endif          
2872                 vecPtrs[vec]=(char *)&msgHdr[m];
2873     vec++;
2874     
2875     /* Now send the actual message data */
2876     vecSizes[vec]=sizes[m]; vecPtrs[vec]=msgComps[m];
2877     vec++;
2878     
2879     /* Now send padding to align the next message on a double-boundary */
2880     vecSizes[vec]=paddingSize(sizes[m]); vecPtrs[vec]=(char *)&pad;
2881     vec++;
2882   }
2883   CmiAssert(vec==vecLen);
2884   
2885   CmiSyncVectorSend(destPE, vecLen, vecSizes, vecPtrs);
2886   
2887   CmiTmpFree(vecPtrs); /* CmiTmp: Be sure to throw away in opposite order of allocation */
2888   CmiTmpFree(vecSizes);
2889   CmiTmpFree(msgHdr);
2890 }
2891
2892 void CmiMultipleSend(unsigned int destPE, int len, int sizes[], char *msgComps[])
2893 {
2894   _CmiMultipleSend(destPE, len, sizes, msgComps, 0);
2895 }
2896
2897 void CmiMultipleIsend(unsigned int destPE, int len, int sizes[], char *msgComps[])
2898 {
2899   _CmiMultipleSend(destPE, len, sizes, msgComps, 1);
2900 }
2901
2902 /****************************************************************************
2903 * DESCRIPTION : This function initializes the main handler required for the
2904 *               CmiMultipleSend() function to work. 
2905 *               
2906 *               This function should be called once in any Converse program
2907 *               that uses CmiMultipleSend()
2908 *
2909 ****************************************************************************/
2910
2911 static void CmiMultiMsgHandler(char *msgWhole);
2912
2913 void CmiInitMultipleSend(void)
2914 {
2915   CpvInitialize(int,CmiMainHandlerIDP); 
2916   CpvAccess(CmiMainHandlerIDP) =
2917     CmiRegisterHandler((CmiHandler)CmiMultiMsgHandler);
2918 }
2919
2920 /****************************************************************************
2921 * DESCRIPTION : This function is the main handler that splits up the messages
2922 *               CmiMultipleSend() pastes together. 
2923 *
2924 ****************************************************************************/
2925
2926 static void CmiMultiMsgHandler(char *msgWhole)
2927 {
2928   int len=((CmiMultipleSendHeader *)msgWhole)->nMessages;
2929   int offset=sizeof(CmiMultipleSendHeader);
2930   int m;
2931   for (m=0;m<len;m++) {
2932 #if CMK_USE_IBVERBS
2933     infiCmiChunkHeader *ch=(infiCmiChunkHeader *)(msgWhole+offset);
2934     char *msg=(msgWhole+offset+sizeof(infiCmiChunkHeader));
2935     int msgSize=ch->chunkHeader.size; /* Size of user portion of message (plus padding at end) */
2936     ch->chunkHeader.ref=msgWhole-msg; 
2937     ch->metaData =  registerMultiSendMesg(msg,msgSize);
2938 #else
2939     CmiChunkHeader *ch=(CmiChunkHeader *)(msgWhole+offset);
2940     char *msg=(msgWhole+offset+sizeof(CmiChunkHeader));
2941     int msgSize=ch->size; /* Size of user portion of message (plus padding at end) */
2942     ch->ref=msgWhole-msg; 
2943 #endif          
2944     /* Link new message to owner via a negative ref pointer */
2945     CmiReference(msg); /* Follows link & increases reference count of *msgWhole* */
2946     CmiSyncSendAndFree(CmiMyPe(), msgSize, msg);
2947 #if CMK_USE_IBVERBS
2948     offset+= sizeof(infiCmiChunkHeader) + msgSize;
2949 #else
2950     offset+= sizeof(CmiChunkHeader) + msgSize;
2951 #endif          
2952   }
2953   /* Release our reference to the whole message.  The message will
2954      only actually be deleted once all its sub-messages are free'd as well. */
2955   CmiFree(msgWhole);
2956 }
2957
2958 /****************************************************************************
2959 * Hypercube broadcast message passing.
2960 ****************************************************************************/
2961
2962 int HypercubeGetBcastDestinations(int mype, int total_pes, int k, int *dest_pes) {
2963   int num_pes = 0;
2964   for ( ; k>=0; --k) {
2965     /* add the processor destination at level k if it exist */
2966     dest_pes[num_pes] = mype ^ (1<<k);
2967     if (dest_pes[num_pes] >= total_pes) {
2968       /* find the first proc in the other part of the current dimention */
2969       dest_pes[num_pes] &= (-1)<<k;
2970       /* if the first proc there is over CmiNumPes() then there is no other
2971          dimension, otherwise if it is valid compute my correspondent in such
2972          a way to minimize the load for every processor */
2973       if (total_pes>dest_pes[num_pes]) dest_pes[num_pes] += (mype - (mype & ((-1)<<k))) % (total_pes - dest_pes[num_pes]);
2974       }
2975     if (dest_pes[num_pes] < total_pes) {
2976       /* if the destination is in the acceptable range increment num_pes */
2977       ++num_pes;
2978     }
2979   }
2980   return num_pes;
2981 }
2982
2983
2984 /****************************************************************************
2985 * DESCRIPTION : This function initializes the main handler required for the
2986 *               Immediate message
2987 *               
2988 *               This function should be called once in any Converse program
2989 *
2990 ****************************************************************************/
2991
2992 int _immediateLock = 0; /* if locked, all immediate message handling will be delayed. */
2993 int _immediateFlag = 0; /* if set, there is delayed immediate message. */
2994
2995 CpvDeclare(int, CmiImmediateMsgHandlerIdx); /* Main handler that is run on every node */
2996
2997 /* xdl is the real handler */
2998 static void CmiImmediateMsgHandler(char *msg)
2999 {
3000   CmiSetHandler(msg, CmiGetXHandler(msg));
3001   CmiHandleMessage(msg);
3002 }
3003
3004 void CmiInitImmediateMsg(void)
3005 {
3006   CpvInitialize(int,CmiImmediateMsgHandlerIdx); 
3007   CpvAccess(CmiImmediateMsgHandlerIdx) =
3008     CmiRegisterHandler((CmiHandler)CmiImmediateMsgHandler);
3009 }
3010
3011 /*#if !CMK_IMMEDIATE_MSG
3012 #if !CMK_MACHINE_PROGRESS_DEFINED
3013 void CmiProbeImmediateMsg()
3014 {
3015 }
3016 #endif
3017 #endif*/
3018
3019 /******** Idle timeout module (+idletimeout=30) *********/
3020
3021 typedef struct {
3022   int idle_timeout;/*Milliseconds to wait idle before aborting*/
3023   int is_idle;/*Boolean currently-idle flag*/
3024   int call_count;/*Number of timeout calls currently in flight*/
3025 } cmi_cpu_idlerec;
3026
3027 static void on_timeout(cmi_cpu_idlerec *rec,double curWallTime)
3028 {
3029   rec->call_count--;
3030   if(rec->call_count==0 && rec->is_idle==1) {
3031     CmiError("Idle time on PE %d exceeded specified timeout.\n", CmiMyPe());
3032     CmiAbort("Exiting.\n");
3033   }
3034 }
3035 static void on_idle(cmi_cpu_idlerec *rec,double curWallTime)
3036 {
3037   CcdCallFnAfter((CcdVoidFn)on_timeout, rec, rec->idle_timeout);
3038   rec->call_count++; /*Keeps track of overlapping timeout calls.*/  
3039   rec->is_idle = 1;
3040 }
3041 static void on_busy(cmi_cpu_idlerec *rec,double curWallTime)
3042 {
3043   rec->is_idle = 0;
3044 }
3045 static void CIdleTimeoutInit(char **argv)
3046 {
3047   int idle_timeout=0; /*Seconds to wait*/
3048   CmiGetArgIntDesc(argv,"+idle-timeout",&idle_timeout,"Abort if idle for this many seconds");
3049   if(idle_timeout != 0) {
3050     cmi_cpu_idlerec *rec=(cmi_cpu_idlerec *)malloc(sizeof(cmi_cpu_idlerec));
3051     _MEMCHECK(rec);
3052     rec->idle_timeout=idle_timeout*1000;
3053     rec->is_idle=0;
3054     rec->call_count=0;
3055     CcdCallOnCondition(CcdPROCESSOR_BEGIN_IDLE, (CcdVoidFn)on_idle, rec);
3056     CcdCallOnCondition(CcdPROCESSOR_BEGIN_BUSY, (CcdVoidFn)on_busy, rec);
3057   }
3058 }
3059
3060 /*****************************************************************************
3061  *
3062  * Converse Initialization
3063  *
3064  *****************************************************************************/
3065
3066 extern void CrnInit(void);
3067 extern void CmiIsomallocInit(char **argv);
3068 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3069 void CmiIOInit(char **argv);
3070 #endif
3071
3072 static void CmiProcessPriority(char **argv)
3073 {
3074   int dummy, nicelevel=-100;      /* process priority */
3075   CmiGetArgIntDesc(argv,"+nice",&nicelevel,"Set the process priority level");
3076   /* ignore others */
3077   while (CmiGetArgIntDesc(argv,"+nice",&dummy,"Set the process priority level"));
3078   /* call setpriority once on each process to set process's priority */
3079   if (CmiMyRank() == 0 && nicelevel != -100)  {
3080 #ifndef _WIN32
3081     if (0!=setpriority(PRIO_PROCESS, 0, nicelevel))  {
3082       CmiPrintf("[%d] setpriority failed with value %d. \n", CmiMyPe(), nicelevel);
3083       perror("setpriority");
3084       CmiAbort("setpriority failed.");
3085     }
3086     else
3087       CmiPrintf("[%d] Charm++: setpriority %d\n", CmiMyPe(), nicelevel);
3088 #else
3089     HANDLE hProcess = GetCurrentProcess();
3090     DWORD dwPriorityClass = NORMAL_PRIORITY_CLASS;
3091     char *prio_str = "NORMAL_PRIORITY_CLASS";
3092     BOOL status;
3093     /*
3094        <-20:      real time
3095        -20--10:   high 
3096        -10-0:     above normal
3097        0:         normal
3098        0-10:      below normal
3099        10-:       idle
3100     */
3101     if (0) ;
3102 #ifdef BELOW_NORMAL_PRIORITY_CLASS
3103     else if (nicelevel<10 && nicelevel>0) {
3104       dwPriorityClass = BELOW_NORMAL_PRIORITY_CLASS;
3105       prio_str = "BELOW_NORMAL_PRIORITY_CLASS";
3106     }
3107 #endif
3108     else if (nicelevel>0) {
3109       dwPriorityClass = IDLE_PRIORITY_CLASS;
3110       prio_str = "IDLE_PRIORITY_CLASS";
3111     }
3112     else if (nicelevel<=-20) {
3113       dwPriorityClass = REALTIME_PRIORITY_CLASS;
3114       prio_str = "REALTIME_PRIORITY_CLASS";
3115     }
3116 #ifdef ABOVE_NORMAL_PRIORITY_CLASS
3117     else if (nicelevel>-10 && nicelevel<0) {
3118       dwPriorityClass = ABOVE_NORMAL_PRIORITY_CLASS;
3119       prio_str = "ABOVE_NORMAL_PRIORITY_CLASS";
3120     }
3121 #endif
3122     else if (nicelevel<0) {
3123       dwPriorityClass = HIGH_PRIORITY_CLASS;
3124       prio_str = "HIGH_PRIORITY_CLASS";
3125     }
3126     status = SetPriorityClass(hProcess, dwPriorityClass);
3127     if (!status)  {
3128         int err=GetLastError();
3129         CmiPrintf("SetPriorityClass failed errno=%d, WSAerr=%d\n",errno, err);
3130         CmiAbort("SetPriorityClass failed.");
3131     }
3132     else
3133       CmiPrintf("[%d] Charm++: setpriority %s\n", CmiMyPe(), prio_str);
3134 #endif
3135   }
3136 }
3137
3138 void CommunicationServerInit()
3139 {
3140 #if CMK_IMMEDIATE_MSG
3141   CQdCpvInit();
3142   CpvInitialize(int,CmiImmediateMsgHandlerIdx); 
3143 #endif
3144 }
3145
3146
3147 static int testEndian(void)
3148 {
3149         int test=0x1c;
3150         unsigned char *c=(unsigned char *)&test;
3151         if (c[sizeof(int)-1]==0x1c)
3152                 /* Macintosh and most workstations are big-endian */
3153                 return 1;   /* Big-endian machine */
3154         if (c[0]==0x1c)
3155                 /* Intel x86 PC's, and DEC VAX are little-endian */
3156                 return 0;  /* Little-endian machine */
3157         return -2;  /*Unknown integer type */
3158 }
3159
3160 int CmiEndianness()
3161 {
3162   static int _cmi_endianness = -1;
3163   if (_cmi_endianness == -1) _cmi_endianness = testEndian();
3164   CmiAssert(_cmi_endianness != -2);
3165   return  _cmi_endianness;
3166 }
3167
3168 /**
3169   Main Converse initialization routine.  This routine is 
3170   called by the machine file (machine.c) to set up Converse.
3171   It's "Common" because it's shared by all the machine.c files. 
3172   
3173   The main task of this routine is to set up all the Cpv's
3174   (message queues, handler tables, etc.) used during main execution.
3175   
3176   On SMP versions, this initialization routine is called by 
3177   *all* processors of a node simultaniously.  It's *also* called
3178   by the communication thread, which is rather strange but needed
3179   for immediate messages.  Each call to this routine expects a 
3180   different copy of the argv arguments, so use CmiCopyArgs(argv).
3181   
3182   Requires:
3183     - A working network layer.
3184     - Working Cpv's and CmiNodeBarrier.
3185     - CthInit to already have been called.  CthInit is called
3186       from the machine layer directly, because some machine layers
3187       (like uth) use Converse threads internally.
3188
3189   Initialization is somewhat subtle, in that various modules
3190   won't work properly until they're initialized.  For example,
3191   nobody can register handlers before calling CmiHandlerInit.
3192 */
3193 void ConverseCommonInit(char **argv)
3194 {
3195
3196 /**
3197  * The reason to initialize this variable here:
3198  * cmiArgDebugFlag is possibly accessed in CmiPrintf/CmiError etc.,
3199  * therefore, we have to initialize this variable before any calls
3200  * to those functions (such as CmiPrintf). Otherwise, we may encounter
3201  * a memory segmentation fault (bad memory access). Note, even
3202  * testing CpvInitialized(cmiArgDebugFlag) doesn't help to solve
3203  * this problem because the variable indicating whether cmiArgDebugFlag is 
3204  * initialized or not is not initialized, thus possibly causing another
3205  * bad memory access. --Chao Mei
3206  */
3207 #if CMK_CCS_AVAILABLE
3208   CpvInitialize(int, cmiArgDebugFlag);
3209 #endif
3210
3211   CmiArgInit(argv);
3212   CmiMemoryInit(argv);
3213 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3214   CmiIOInit(argv);
3215 #endif
3216 #if CONVERSE_POOL
3217   CmiPoolAllocInit(30);  
3218 #endif
3219   CmiTmpInit(argv);
3220   CmiTimerInit();
3221   CstatsInit(argv);
3222
3223   CcdModuleInit(argv);
3224   CmiHandlerInit();
3225   CmiReductionsInit();
3226   CIdleTimeoutInit(argv);
3227   
3228 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
3229   if(CmiGetArgFlagDesc(argv,"+CmiNoProcForComThread","Is there an extra processor for the communication thread on each node(only for net-smp-*) ?")){
3230     if(CmiMyRank() == 0) _Cmi_noprocforcommthread=1;
3231    }
3232 #endif
3233         
3234 #if CMK_TRACE_ENABLED
3235   traceInit(argv);
3236 /*initTraceCore(argv);*/ /* projector */
3237 #endif
3238   CmiProcessPriority(argv);
3239
3240   CmiPersistentInit();
3241   CmiIsomallocInit(argv);
3242   CmiDeliversInit();
3243   CsdInit(argv);
3244 #if CMK_CCS_AVAILABLE
3245   CcsInit(argv);
3246 #endif
3247   CpdInit();
3248   CthSchedInit();
3249   CmiGroupInit();
3250   CmiMulticastInit();
3251   CmiInitMultipleSend();
3252   CQdInit();
3253
3254   CrnInit();
3255   CmiInitImmediateMsg();
3256   CldModuleInit(argv);
3257   
3258 #if CMK_CELL
3259   void CmiInitCell();
3260   CmiInitCell();
3261 #endif
3262
3263 #ifdef CMK_CUDA
3264   initHybridAPI(CmiMyPe()); 
3265 #endif
3266
3267   /* main thread is suspendable */
3268 /*
3269   CthSetSuspendable(CthSelf(), 0);
3270 */
3271
3272 #if CMK_BLUEGENE_CHARM
3273    /* have to initialize QD here instead of _initCharm */
3274   extern void initQd(char **argv);
3275   initQd(argv);
3276 #endif
3277 }
3278
3279 void ConverseCommonExit(void)
3280 {
3281   CcsImpl_kill();
3282
3283 #if CMK_TRACE_ENABLED
3284   traceClose();
3285 /*closeTraceCore();*/ /* projector */
3286 #endif
3287
3288 #if CMI_IO_BUFFER_EXPLICIT
3289   CmiFlush(stdout);  /* end of program, always flush */
3290 #endif
3291
3292 #if CMK_CELL
3293   CloseOffloadAPI();
3294 #endif
3295
3296 #if CMK_CUDA
3297   exitHybridAPI(); 
3298 #endif
3299
3300   EmergencyExit();
3301 }
3302
3303
3304 #if CMK_CELL != 0
3305
3306 extern void register_accel_spe_funcs(void);
3307
3308 void CmiInitCell()
3309 {
3310   // Create a unique string for each PPE to use for the timing
3311   //   data file's name
3312   char fileNameBuf[64];
3313   sprintf(fileNameBuf, "speTiming.%d", CmiMyPe());
3314
3315   InitOffloadAPI(offloadCallback, NULL, NULL, fileNameBuf);
3316   //CcdCallOnConditionKeep(CcdPERIODIC, 
3317   //      (CcdVoidFn) OffloadAPIProgress, NULL);
3318   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
3319       (CcdVoidFn) OffloadAPIProgress, NULL);
3320
3321   // Register accelerated entry methods on the PPE
3322   register_accel_spe_funcs();
3323 }
3324
3325 #include "cell-api.c"
3326
3327 #endif
3328
3329 /****
3330  * CW Lee - 9/14/2005
3331  * Added a mechanism to allow some control over machines with extremely
3332  * inefficient terminal IO mechanisms. Case in point: the XT3 has a
3333  * 20ms flush overhead along with about 25MB/s bandwidth for IO. This,
3334  * coupled with a default setup using unbuffered stdout introduced
3335  * severe overheads (and hence limiting scaling) for applications like 
3336  * NAMD.
3337  */
3338 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3339 void CmiIOInit(char **argv) {
3340   CpvInitialize(int, expIOFlushFlag);
3341 #if CMI_IO_BUFFER_EXPLICIT
3342   /* 
3343      Support for an explicit buffer only makes sense if the machine
3344      layer does not wish to make its own implementation.
3345
3346      Placing this after CmiMemoryInit() means that CmiMemoryInit()
3347      MUST NOT make use of stdout if an explicit buffer is requested.
3348
3349      The setvbuf function may only be used after opening a stream and
3350      before any other operations have been performed on it
3351   */
3352   CpvInitialize(char*, explicitIOBuffer);
3353   CpvInitialize(int, expIOBufferSize);
3354   if (!CmiGetArgIntDesc(argv,"+io_buffer_size", &CpvAccess(expIOBufferSize),
3355                         "Explicit IO Buffer Size")) {
3356     CpvAccess(expIOBufferSize) = DEFAULT_IO_BUFFER_SIZE;
3357   }
3358   if (CpvAccess(expIOBufferSize) <= 0) {
3359     CpvAccess(expIOBufferSize) = DEFAULT_IO_BUFFER_SIZE;
3360   }
3361   CpvAccess(explicitIOBuffer) = (char*)CmiAlloc(CpvAccess(expIOBufferSize)*
3362                                                 sizeof(char));
3363   if (setvbuf(stdout, CpvAccess(explicitIOBuffer), _IOFBF, 
3364               CpvAccess(expIOBufferSize))) {
3365     CmiAbort("Explicit IO Buffering failed\n");
3366   }
3367 #endif
3368 #if CMI_IO_FLUSH_USER
3369   /* system default to have user control flushing of IO */
3370   /* Now look for user override */
3371   CpvAccess(expIOFlushFlag) = !CmiGetArgFlagDesc(argv,"+io_flush_system",
3372                                                  "System Controls IO Flush");
3373 #else
3374   /* system default to have system handle IO flushing */
3375   /* Now look for user override */
3376   CpvAccess(expIOFlushFlag) = CmiGetArgFlagDesc(argv,"+io_flush_user",
3377                                                 "User Controls IO Flush");
3378 #endif
3379 }
3380 #endif
3381
3382 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3383
3384 void CmiPrintf(const char *format, ...)
3385 {
3386   CpdSystemEnter();
3387   {
3388   va_list args;
3389   va_start(args,format);
3390   vfprintf(stdout,format, args);
3391   if (CpvInitialized(expIOFlushFlag) && !CpvAccess(expIOFlushFlag)) {
3392     CmiFlush(stdout);
3393   }
3394   va_end(args);
3395 #if CMK_CCS_AVAILABLE
3396   if (CpvAccess(cmiArgDebugFlag)) {
3397     va_start(args,format);
3398     print_node0(format, args);
3399     va_end(args);
3400   }
3401 #endif
3402   }
3403   CpdSystemExit();
3404 }
3405
3406 void CmiError(const char *format, ...)
3407 {
3408   CpdSystemEnter();
3409   {
3410   va_list args;
3411   va_start(args,format);
3412   vfprintf(stderr,format, args);
3413   CmiFlush(stderr);  /* stderr is always flushed */
3414   va_end(args);
3415 #if CMK_CCS_AVAILABLE
3416   if (CpvAccess(cmiArgDebugFlag)) {
3417     va_start(args,format);
3418     print_node0(format, args);
3419     va_end(args);
3420   }
3421 #endif
3422   }
3423   CpdSystemExit();
3424 }
3425
3426 #endif
3427
3428 void __cmi_assert(const char *expr, const char *file, int line)
3429 {
3430   CmiError("[%d] Assertion \"%s\" failed in file %s line %d.\n",
3431       CmiMyPe(), expr, file, line);
3432   CmiAbort("");
3433 }
3434
3435 char *CmiCopyMsg(char *msg, int len)
3436 {
3437   char *copy = (char *)CmiAlloc(len);
3438   _MEMCHECK(copy);
3439   memcpy(copy, msg, len);
3440   return copy;
3441 }
3442
3443 unsigned char computeCheckSum(unsigned char *data, int len)
3444 {
3445   int i;
3446   unsigned char ret = 0;
3447   for (i=0; i<len; i++) ret ^= (unsigned char)data[i];
3448   return ret;
3449 }
3450
3451 /* Flag for bigsim's out-of-core emulation */
3452 int _BgOutOfCoreFlag=0; /*indicate the type of memory operation (in or out) */
3453 int _BgInOutOfCoreMode=0; /*indicate whether the emulation is in the out-of-core emulation mode */
3454
3455 #if !CMK_HAS_LOG2
3456 unsigned int CmiLog2(unsigned int val) {
3457   unsigned int log = 0u;
3458   if ( val != 0u ) {
3459       while ( val > (1u<<log) ) { log++; }
3460   }
3461   return log;
3462 }
3463 #endif
3464
3465 /* for bigsim */
3466 int CmiMyRank_()
3467 {
3468   return CmiMyRank();
3469 }
3470
3471 /*@}*/