fix for migration
[charm.git] / src / conv-core / convcore.c
1 /** 
2   @defgroup Converse
3   \brief Converse--a parallel portability layer.
4
5   Converse is the lowest level inside the Charm++ hierarchy. It stands on top
6   of the machine layer, and it provides all the common functionality across
7   platforms.
8
9   One converse program is running on every processor (or node in the smp
10   version). it manages the message transmission, and the memory allocation.
11   Charm++, which is on top of Converse, uses its functionality for
12   interprocess *communication.
13
14   In order to maintain multiple independent objects inside a single user space
15   program, it uses a personalized version of threads, which can be executed,
16   suspended, and migrated across processors.
17
18   It provides a scheduler for message delivery: methods can be registered to
19   the scheduler, and then messages allocated through CmiAlloc can be sent to
20   the correspondent method in a remote processor. This is done through the
21   converse header (which has few common fields, but is architecture dependent).
22
23   @defgroup ConverseScheduler 
24   \brief The portion of Converse responsible for scheduling the execution of 
25   incoming messages.
26   
27   Converse provides a scheduler for message delivery: methods can be registered to 
28   the scheduler, and then messages allocated through CmiAlloc can be sent to 
29   the correspondent method in a remote processor. This is done through the
30   converse header (which has few common fields, but is architecture dependent).
31
32   In converse the CsdScheduleForever() routine will run an infinite while loop that
33   looks for available messages to process from the unprocessed message queues. The
34   details of the queues and the order in which they are emptied is hidden behind
35   CsdNextMessage(), which is used to dequeue the next message for processing by the
36   converse scheduler. When a message is taken from the queue it is then passed into
37   CmiHandleMessage() which calls the handler associated with the message.
38
39   Incoming messages that are destined for Charm++ will be passed to the 
40   \ref CharmScheduler "charm scheduling routines".
41
42   @file
43   converse main core
44   @ingroup Converse
45   @ingroup ConverseScheduler
46  
47   @addtogroup Converse
48   @{
49
50 */
51
52 #include <stdio.h>
53 #include <stdlib.h>
54 #include <string.h>
55 #include <errno.h>
56 #ifndef _WIN32
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #endif
60
61 #include "converse.h"
62 #include "conv-trace.h"
63 #include "sockRoutines.h"
64 #include "queueing.h"
65 #include "conv-ccs.h"
66 #include "ccs-server.h"
67 #include "memory-isomalloc.h"
68 #if CMK_PROJECTOR
69 #include "converseEvents.h"             /* projector */
70 #include "traceCoreCommon.h"    /* projector */
71 #include "machineEvents.h"     /* projector */
72 #endif
73
74 extern const char * const CmiCommitID;
75
76 #if CMK_OUT_OF_CORE
77 #include "conv-ooc.h"
78 #endif
79
80 #if CONVERSE_POOL
81 #include "cmipool.h"
82 #endif
83
84 #if CMK_CONDS_USE_SPECIAL_CODE
85 CmiSwitchToPEFnPtr CmiSwitchToPE;
86 #endif
87
88 CpvExtern(int, _traceCoreOn);   /* projector */
89 extern void CcdModuleInit(char **);
90 extern void CmiMemoryInit(char **);
91 extern void CldModuleInit(char **);
92
93 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
94 #include <sys/types.h>
95 #include <sys/time.h>
96 #endif
97
98 #if CMK_TIMER_USE_TIMES
99 #include <sys/times.h>
100 #include <limits.h>
101 #include <unistd.h>
102 #endif
103
104 #if CMK_TIMER_USE_GETRUSAGE
105 #include <sys/time.h>
106 #include <sys/resource.h>
107 #endif
108
109 #if CMK_TIMER_USE_RDTSC
110 #include <string.h>
111 #include <unistd.h>
112 #include <time.h>
113 #include <stdio.h>
114 #include <sys/time.h>
115 #include <sys/resource.h>
116 #endif
117
118 #ifdef CMK_TIMER_USE_WIN32API
119 #include <stdlib.h>
120 #include <time.h>
121 #include <sys/types.h>
122 #include <sys/timeb.h>
123 #endif
124
125 #ifdef CMK_HAS_ASCTIME
126 #include <time.h>
127 #endif
128
129 #ifdef CMK_CUDA
130 #include "cuda-hybrid-api.h"
131 #endif
132
133 #include "quiescence.h"
134
135 #if USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
136 #include <mpi.h>
137 #endif
138
139 //int cur_restart_phase = 1;      /* checkpointing/restarting phase counter */
140 CpvDeclare(int,_curRestartPhase);
141 static int CsdLocalMax = CSD_LOCAL_MAX_DEFAULT;
142
143 int CharmLibInterOperate = 0;
144 CpvDeclare(int,charmLibExitFlag);
145
146 CpvStaticDeclare(int, CmiMainHandlerIDP); /* Main handler for _CmiMultipleSend that is run on every node */
147
148 #if CMK_MEM_CHECKPOINT
149 void (*notify_crash_fn)(int) = NULL;
150 #endif
151
152 CpvDeclare(char *, _validProcessors);
153
154 /*****************************************************************************
155  *
156  * Unix Stub Functions
157  *
158  ****************************************************************************/
159
160 #ifdef MEMMONITOR
161 typedef unsigned long mmulong;
162 CpvDeclare(mmulong,MemoryUsage);
163 CpvDeclare(mmulong,HiWaterMark);
164 CpvDeclare(mmulong,ReportedHiWaterMark);
165 CpvDeclare(int,AllocCount);
166 CpvDeclare(int,BlocksAllocated);
167 #endif
168
169 #define MAX_HANDLERS 512
170
171 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
172 CpvDeclare(int,expIOFlushFlag);
173 #if CMI_IO_BUFFER_EXPLICIT
174 /* 250k not too large depending on how slow terminal IO is */
175 #define DEFAULT_IO_BUFFER_SIZE 250000
176 CpvDeclare(char*,explicitIOBuffer);
177 CpvDeclare(int,expIOBufferSize);
178 #endif
179 #endif
180
181 #if CMK_NODE_QUEUE_AVAILABLE
182 void  *CmiGetNonLocalNodeQ();
183 #endif
184
185 CpvDeclare(void*, CsdSchedQueue);
186
187 #if CMK_OUT_OF_CORE
188 /* The Queue where the Prefetch Thread puts the messages from CsdSchedQueue  */
189 CpvDeclare(void*, CsdPrefetchQueue);
190 pthread_mutex_t prefetchLock;
191 #endif
192
193 #if CMK_NODE_QUEUE_AVAILABLE
194 CsvDeclare(void*, CsdNodeQueue);
195 CsvDeclare(CmiNodeLock, CsdNodeQueueLock);
196 #endif
197 CpvDeclare(int,   CsdStopFlag);
198 CpvDeclare(int,   CsdLocalCounter);
199
200 CpvDeclare(int,   _urgentSend);
201
202 CmiNodeLock _smp_mutex;               /* for smp */
203
204 #if CONVERSE_VERSION_VMI
205 void *CMI_VMI_CmiAlloc (int size);
206 void CMI_VMI_CmiFree (void *ptr);
207 #endif
208
209 #if CONVERSE_VERSION_ELAN
210 void* elan_CmiAlloc(int size);
211 #endif
212
213 #if CMK_USE_IBVERBS | CMK_USE_IBUD
214 void *infi_CmiAlloc(int size);
215 void infi_CmiFree(void *ptr);
216 void infi_freeMultipleSend(void *ptr);
217 void infi_unregAndFreeMeta(void *ch);
218 #endif
219
220 #if CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
221 void * CmiAlloc_bgq (int     size);
222 void   CmiFree_bgq  (void  * buf);
223 #endif
224
225 #if CMK_GRID_QUEUE_AVAILABLE
226 CpvDeclare(void *, CkGridObject);
227 CpvDeclare(void *, CsdGridQueue);
228 #endif
229
230 #if CMK_CRAYXE
231 void* LrtsAlloc(int, int);
232 void  LrtsFree(void*);
233 #endif
234
235 CpvStaticDeclare(int, cmiMyPeIdle);
236 int CmiIsMyNodeIdle();
237
238 /*****************************************************************************
239  *
240  * Command-Line Argument (CLA) parsing routines.
241  *
242  *****************************************************************************/
243
244 static int usageChecked=0; /* set when argv has been searched for a usage request */
245 static int printUsage=0; /* if set, print command-line usage information */
246 static const char *CLAformatString="%20s %10s %s\n";
247
248 /** This little list of CLA's holds the argument descriptions until it's
249    safe to print them--it's needed because the net- versions don't have 
250    printf until they're pretty well started.
251  */
252 typedef struct {
253         const char *arg; /* Flag name, like "-foo"*/
254         const char *param; /* Argument's parameter type, like "integer" or "none"*/
255         const char *desc; /* Human-readable description of what it does */
256 } CLA;
257 static int CLAlistLen=0;
258 static int CLAlistMax=0;
259 static CLA *CLAlist=NULL;
260
261 /** Add this CLA */
262 static void CmiAddCLA(const char *arg,const char *param,const char *desc) {
263         int i;
264         if (CmiMyPe()!=0) return; /*Don't bother if we're not PE 0*/
265         if (desc==NULL) return; /*It's an internal argument*/
266         if (usageChecked) { /* Printf should work now */
267                 if (printUsage)
268                         CmiPrintf(CLAformatString,arg,param,desc);
269         }
270         else { /* Printf doesn't work yet-- just add to the list.
271                 This assumes the const char *'s are static references,
272                 which is probably reasonable. */
273                 i=CLAlistLen++;
274                 if (CLAlistLen>CLAlistMax) { /*Grow the CLA list */
275                         CLAlistMax=16+2*CLAlistLen;
276                         CLAlist=realloc(CLAlist,sizeof(CLA)*CLAlistMax);\
277                 }
278                 CLAlist[i].arg=arg;
279                 CLAlist[i].param=param;
280                 CLAlist[i].desc=desc;
281         }
282 }
283
284 /** Print out the stored list of CLA's */
285 static void CmiPrintCLAs(void) {
286         int i;
287         if (CmiMyPe()!=0) return; /*Don't bother if we're not PE 0*/
288         CmiPrintf("Converse Machine Command-line Parameters:\n ");
289         CmiPrintf(CLAformatString,"Option:","Parameter:","Description:");
290         for (i=0;i<CLAlistLen;i++) {
291                 CLA *c=&CLAlist[i];
292                 CmiPrintf(CLAformatString,c->arg,c->param,c->desc);
293         }
294 }
295
296 /**
297  * Determines if command-line usage information should be printed--
298  * that is, if a "-?", "-h", or "--help" flag is present.
299  * Must be called after printf is setup.
300  */
301 void CmiArgInit(char **argv) {
302         int i;
303         for (i=0;argv[i]!=NULL;i++)
304         {
305                 if (0==strcmp(argv[i],"-?") ||
306                     0==strcmp(argv[i],"-h") ||
307                     0==strcmp(argv[i],"--help")) 
308                 {
309                         printUsage=1;
310                         /* Don't delete arg:  CmiDeleteArgs(&argv[i],1);
311                           Leave it there for user program to see... */
312                         CmiPrintCLAs();
313                 }
314         }
315         if (CmiMyPe()==0) { /* Throw away list of stored CLA's */
316                 CLAlistLen=CLAlistMax=0;
317                 free(CLAlist); CLAlist=NULL;
318         }
319         usageChecked=1;
320 }
321
322 /** Return 1 if we're currently printing command-line usage information. */
323 int CmiArgGivingUsage(void) {
324         return (CmiMyPe()==0) && printUsage;
325 }
326
327 /** Identifies the module that accepts the following command-line parameters */
328 void CmiArgGroup(const char *parentName,const char *groupName) {
329         if (CmiArgGivingUsage()) {
330                 if (groupName==NULL) groupName=parentName; /* Start of a new group */
331                 CmiPrintf("\n%s Command-line Parameters:\n",groupName);
332         }
333 }
334
335 /** Count the number of non-NULL arguments in list*/
336 int CmiGetArgc(char **argv)
337 {
338         int i=0,argc=0;
339         if (argv)
340         while (argv[i++]!=NULL)
341                 argc++;
342         return argc;
343 }
344
345 /** Return a new, heap-allocated copy of the argv array*/
346 char **CmiCopyArgs(char **argv)
347 {
348         int argc=CmiGetArgc(argv);
349         char **ret=(char **)malloc(sizeof(char *)*(argc+1));
350         int i;
351         for (i=0;i<argc;i++) {
352         //      printf("Copyin argc %d/%d of lenght %d %s\n",i,argc,strlen(argv[i]), argv[i]);
353         //      ret[i] = (char*)malloc(strlen(argv[i])*sizeof(char));
354         //      memcpy(ret[i],argv[i],strlen(argv[i])*sizeof(char)); 
355           ret[i] = argv[i];
356         }
357         return ret;
358 }
359
360 /** Delete the first k argument from the given list, shifting
361 all other arguments down by k spaces.
362 e.g., argv=={"a","b","c","d",NULL}, k==3 modifies
363 argv={"d",NULL,"c","d",NULL}
364 */
365 void CmiDeleteArgs(char **argv,int k)
366 {
367         int i=0;
368         while ((argv[i]=argv[i+k])!=NULL)
369                 i++;
370 }
371
372 /** Find the given argment and string option in argv.
373 If the argument is present, set the string option and
374 delete both from argv.  If not present, return NULL.
375 e.g., arg=="-name" returns "bob" from
376 argv=={"a.out","foo","-name","bob","bar"},
377 and sets argv={"a.out","foo","bar"};
378 */
379 int CmiGetArgStringDesc(char **argv,const char *arg,char **optDest,const char *desc)
380 {
381         int i;
382         CmiAddCLA(arg,"string",desc);
383         for (i=0;argv[i]!=NULL;i++)
384                 if (0==strcmp(argv[i],arg))
385                 {/*We found the argument*/
386                         if (argv[i+1]==NULL) CmiAbort("Argument not complete!");
387                         *optDest=argv[i+1];
388                         CmiDeleteArgs(&argv[i],2);
389                         return 1;
390                 }
391         return 0;/*Didn't find the argument*/
392 }
393 int CmiGetArgString(char **argv,const char *arg,char **optDest) {
394         return CmiGetArgStringDesc(argv,arg,optDest,"");
395 }
396
397 /** Find the given argument and floating-point option in argv.
398 Remove it and return 1; or return 0.
399 */
400 int CmiGetArgDoubleDesc(char **argv,const char *arg,double *optDest,const char *desc) {
401         char *number=NULL;
402         CmiAddCLA(arg,"number",desc);
403         if (!CmiGetArgStringDesc(argv,arg,&number,NULL)) return 0;
404         if (1!=sscanf(number,"%lg",optDest)) return 0;
405         return 1;
406 }
407 int CmiGetArgDouble(char **argv,const char *arg,double *optDest) {
408         return CmiGetArgDoubleDesc(argv,arg,optDest,"");
409 }
410
411 /** Find the given argument and integer option in argv.
412 If the argument is present, parse and set the numeric option,
413 delete both from argv, and return 1. If not present, return 0.
414 e.g., arg=="-pack" matches argv=={...,"-pack","27",...},
415 argv=={...,"-pack0xf8",...}, and argv=={...,"-pack=0777",...};
416 but not argv=={...,"-packsize",...}.
417 */
418 int CmiGetArgIntDesc(char **argv,const char *arg,int *optDest,const char *desc)
419 {
420         int i;
421         int argLen=strlen(arg);
422         CmiAddCLA(arg,"integer",desc);
423         for (i=0;argv[i]!=NULL;i++)
424                 if (0==strncmp(argv[i],arg,argLen))
425                 {/*We *may* have found the argument*/
426                         const char *opt=NULL;
427                         int nDel=0;
428                         switch(argv[i][argLen]) {
429                         case 0: /* like "-p","27" */
430                                 opt=argv[i+1]; nDel=2; break;
431                         case '=': /* like "-p=27" */
432                                 opt=&argv[i][argLen+1]; nDel=1; break;
433                         case '-':case '+':
434                         case '0':case '1':case '2':case '3':case '4':
435                         case '5':case '6':case '7':case '8':case '9':
436                                 /* like "-p27" */
437                                 opt=&argv[i][argLen]; nDel=1; break;
438                         default:
439                                 continue; /*False alarm-- skip it*/
440                         }
441                         if (opt==NULL) continue; /*False alarm*/
442                         if (sscanf(opt,"%i",optDest)<1) {
443                         /*Bad command line argument-- die*/
444                                 fprintf(stderr,"Cannot parse %s option '%s' "
445                                         "as an integer.\n",arg,opt);
446                                 CmiAbort("Bad command-line argument\n");
447                         }
448                         CmiDeleteArgs(&argv[i],nDel);
449                         return 1;
450                 }
451         return 0;/*Didn't find the argument-- dest is unchanged*/       
452 }
453 int CmiGetArgInt(char **argv,const char *arg,int *optDest) {
454         return CmiGetArgIntDesc(argv,arg,optDest,"");
455 }
456
457 int CmiGetArgLongDesc(char **argv,const char *arg,CmiInt8 *optDest,const char *desc)
458 {
459         int i;
460         int argLen=strlen(arg);
461         CmiAddCLA(arg,"integer",desc);
462         for (i=0;argv[i]!=NULL;i++)
463                 if (0==strncmp(argv[i],arg,argLen))
464                 {/*We *may* have found the argument*/
465                         const char *opt=NULL;
466                         int nDel=0;
467                         switch(argv[i][argLen]) {
468                         case 0: /* like "-p","27" */
469                                 opt=argv[i+1]; nDel=2; break;
470                         case '=': /* like "-p=27" */
471                                 opt=&argv[i][argLen+1]; nDel=1; break;
472                         case '-':case '+':
473                         case '0':case '1':case '2':case '3':case '4':
474                         case '5':case '6':case '7':case '8':case '9':
475                                 /* like "-p27" */
476                                 opt=&argv[i][argLen]; nDel=1; break;
477                         default:
478                                 continue; /*False alarm-- skip it*/
479                         }
480                         if (opt==NULL) continue; /*False alarm*/
481                         if (sscanf(opt,"%ld",optDest)<1) {
482                         /*Bad command line argument-- die*/
483                                 fprintf(stderr,"Cannot parse %s option '%s' "
484                                         "as a long integer.\n",arg,opt);
485                                 CmiAbort("Bad command-line argument\n");
486                         }
487                         CmiDeleteArgs(&argv[i],nDel);
488                         return 1;
489                 }
490         return 0;/*Didn't find the argument-- dest is unchanged*/       
491 }
492 int CmiGetArgLong(char **argv,const char *arg,CmiInt8 *optDest) {
493         return CmiGetArgLongDesc(argv,arg,optDest,"");
494 }
495
496 /** Find the given argument in argv.  If present, delete
497 it and return 1; if not present, return 0.
498 e.g., arg=="-foo" matches argv=={...,"-foo",...} but not
499 argv={...,"-foobar",...}.
500 */
501 int CmiGetArgFlagDesc(char **argv,const char *arg,const char *desc)
502 {
503         int i;
504         CmiAddCLA(arg,"",desc);
505         for (i=0;argv[i]!=NULL;i++)
506                 if (0==strcmp(argv[i],arg))
507                 {/*We found the argument*/
508                         CmiDeleteArgs(&argv[i],1);
509                         return 1;
510                 }
511         return 0;/*Didn't find the argument*/
512 }
513 int CmiGetArgFlag(char **argv,const char *arg) {
514         return CmiGetArgFlagDesc(argv,arg,"");
515 }
516
517
518 /*****************************************************************************
519  *
520  * Stack tracing routines.
521  *
522  *****************************************************************************/
523 #include "cmibacktrace.c"
524
525 /*
526 Convert "X(Y) Z" to "Y Z"-- remove text prior to first '(', and supress
527 the next parenthesis.  Operates in-place on the character data.
528 or Convert X(Y) to "Y" only, when trimname=1
529 */
530 static char *_implTrimParenthesis(char *str, int trimname) {
531   char *lParen=str, *ret=NULL, *rParen=NULL;
532   while (*lParen!='(') {
533     if (*lParen==0) return str; /* No left parenthesis at all. */
534     lParen++;
535   }
536   /* now *lParen=='(', so trim it*/
537   ret=lParen+1;
538   rParen=ret;
539   while (*rParen!=')') {
540     if (*rParen==0) return ret; /* No right parenthesis at all. */
541     rParen++;
542   }
543   /* now *rParen==')', so trim it*/
544   *rParen=trimname?0:' ';
545   return ret;  
546 }
547
548 /*
549 Return the text description of this trimmed routine name, if 
550 it's a system-generated routine where we should stop printing. 
551 This is probably overkill, but improves the appearance of callbacks.
552 */
553 static const char* _implGetBacktraceSys(const char *name) {
554   if (0==strncmp(name,"_call",5)) 
555   { /*it might be something we're interested in*/
556     if (0==strncmp(name,"_call_",6)) return "Call Entry Method";
557     if (0==strncmp(name,"_callthr_",9)) return "Call Threaded Entry Method";
558   }
559   if (0==strncmp(name,"CthResume",9)) return "Resumed thread";
560   if (0==strncmp(name,"qt_args",7)) return "Converse thread";
561   
562   return 0; /*ordinary user routine-- just print normally*/
563 }
564
565 /** Print out the names of these function pointers. */
566 void CmiBacktracePrint(void **retPtrs,int nLevels) {
567   if (nLevels>0) {
568     int i;
569     char **names=CmiBacktraceLookup(retPtrs,nLevels);
570     if (names==NULL) return;
571     CmiPrintf("[%d] Stack Traceback:\n", CmiMyPe());
572     for (i=0;i<nLevels;i++) {
573       if (names[i] == NULL) continue;
574       {
575       const char *trimmed=_implTrimParenthesis(names[i], 0);
576       const char *print=trimmed;
577       const char *sys=_implGetBacktraceSys(print);
578       if (sys) {
579           CmiPrintf("  [%d] Charm++ Runtime: %s (%s)\n",i,sys,print);
580           break; /*Stop when we hit Charm++ runtime.*/
581       } else {
582           CmiPrintf("  [%d:%d] %s\n",CmiMyPe(),i,print);
583       }
584      }
585     }
586     free(names);
587   }
588 }
589
590 /* Print (to stdout) the names of the functions that have been 
591    called up to this point. nSkip is the number of routines on the
592    top of the stack to *not* print out. */
593 void CmiPrintStackTrace(int nSkip) {
594 #if CMK_USE_BACKTRACE
595         int nLevels=max_stack;
596         void *stackPtrs[max_stack];
597         CmiBacktraceRecord(stackPtrs,1+nSkip,&nLevels);
598         CmiBacktracePrint(stackPtrs,nLevels);
599 #endif
600 }
601
602 int CmiIsFortranLibraryCall() {
603 #if CMK_USE_BACKTRACE
604   int ret = 0;
605   int nLevels=9;
606   void *stackPtrs[18];
607   CmiBacktraceRecord(stackPtrs,1,&nLevels);
608   if (nLevels>0) {
609     int i;
610     char **names=CmiBacktraceLookup(stackPtrs,nLevels);
611     if (names==NULL) return 0;
612     for (i=0;i<nLevels;i++) {
613       if (names[i] == NULL) continue;
614       const char *trimmed=_implTrimParenthesis(names[i], 1);
615       if (strncmp(trimmed, "for__", 5) == 0                /* ifort */
616           || strncmp(trimmed, "_xlf", 4) == 0               /* xlf90 */
617           || strncmp(trimmed, "_xlfBeginIO", 11) == 0 
618           || strncmp(trimmed, "_gfortran_", 10) == 0 
619          )
620           {  /* CmiPrintf("[%d] NAME:%s\n", CmiMyPe(), trimmed); */
621              ret = 1; break; }
622     }
623     free(names);
624   }
625   return ret;
626 #else
627   return 0;
628 #endif
629 }
630
631 /*****************************************************************************
632  *
633  * Statistics: currently, the following statistics are not updated by converse.
634  *
635  *****************************************************************************/
636
637 CpvDeclare(int, CstatsMaxChareQueueLength);
638 CpvDeclare(int, CstatsMaxForChareQueueLength);
639 CpvDeclare(int, CstatsMaxFixedChareQueueLength);
640 CpvStaticDeclare(int, CstatPrintQueueStatsFlag);
641 CpvStaticDeclare(int, CstatPrintMemStatsFlag);
642
643 void CstatsInit(argv)
644 char **argv;
645 {
646
647 #ifdef MEMMONITOR
648   CpvInitialize(mmulong,MemoryUsage);
649   CpvAccess(MemoryUsage) = 0;
650   CpvInitialize(mmulong,HiWaterMark);
651   CpvAccess(HiWaterMark) = 0;
652   CpvInitialize(mmulong,ReportedHiWaterMark);
653   CpvAccess(ReportedHiWaterMark) = 0;
654   CpvInitialize(int,AllocCount);
655   CpvAccess(AllocCount) = 0;
656   CpvInitialize(int,BlocksAllocated);
657   CpvAccess(BlocksAllocated) = 0;
658 #endif
659
660   CpvInitialize(int, CstatsMaxChareQueueLength);
661   CpvInitialize(int, CstatsMaxForChareQueueLength);
662   CpvInitialize(int, CstatsMaxFixedChareQueueLength);
663   CpvInitialize(int, CstatPrintQueueStatsFlag);
664   CpvInitialize(int, CstatPrintMemStatsFlag);
665
666   CpvAccess(CstatsMaxChareQueueLength) = 0;
667   CpvAccess(CstatsMaxForChareQueueLength) = 0;
668   CpvAccess(CstatsMaxFixedChareQueueLength) = 0;
669   CpvAccess(CstatPrintQueueStatsFlag) = 0;
670   CpvAccess(CstatPrintMemStatsFlag) = 0;
671
672 #if 0
673   if (CmiGetArgFlagDesc(argv,"+mems", "Print memory statistics at shutdown"))
674     CpvAccess(CstatPrintMemStatsFlag)=1;
675   if (CmiGetArgFlagDesc(argv,"+qs", "Print queue statistics at shutdown"))
676     CpvAccess(CstatPrintQueueStatsFlag)=1;
677 #endif
678 }
679
680 int CstatMemory(i)
681 int i;
682 {
683   return 0;
684 }
685
686 int CstatPrintQueueStats()
687 {
688   return CpvAccess(CstatPrintQueueStatsFlag);
689 }
690
691 int CstatPrintMemStats()
692 {
693   return CpvAccess(CstatPrintMemStatsFlag);
694 }
695
696 /*****************************************************************************
697  *
698  * Cmi handler registration
699  *
700  *****************************************************************************/
701
702 CpvDeclare(CmiHandlerInfo*, CmiHandlerTable);
703 CpvStaticDeclare(int  , CmiHandlerCount);
704 CpvStaticDeclare(int  , CmiHandlerLocal);
705 CpvStaticDeclare(int  , CmiHandlerGlobal);
706 CpvDeclare(int,         CmiHandlerMax);
707
708 static void CmiExtendHandlerTable(int atLeastLen) {
709     int max = CpvAccess(CmiHandlerMax);
710     int newmax = (atLeastLen+(atLeastLen>>2)+32);
711     int bytes = max*sizeof(CmiHandlerInfo);
712     int newbytes = newmax*sizeof(CmiHandlerInfo);
713     CmiHandlerInfo *nu = (CmiHandlerInfo*)malloc(newbytes);
714     CmiHandlerInfo *tab = CpvAccess(CmiHandlerTable);
715     _MEMCHECK(nu);
716     memcpy(nu, tab, bytes);
717     memset(((char *)nu)+bytes, 0, (newbytes-bytes));
718     free(tab); tab=nu;
719     CpvAccess(CmiHandlerTable) = tab;
720     CpvAccess(CmiHandlerMax) = newmax;
721 }
722
723 void CmiNumberHandler(int n, CmiHandler h)
724 {
725   CmiHandlerInfo *tab;
726   if (n >= CpvAccess(CmiHandlerMax)) CmiExtendHandlerTable(n);
727   tab = CpvAccess(CmiHandlerTable);
728   tab[n].hdlr = (CmiHandlerEx)h; /* LIE!  This assumes extra pointer will be ignored!*/
729   tab[n].userPtr = 0;
730 }
731 void CmiNumberHandlerEx(int n, CmiHandlerEx h,void *userPtr) {
732   CmiHandlerInfo *tab;
733   if (n >= CpvAccess(CmiHandlerMax)) CmiExtendHandlerTable(n);
734   tab = CpvAccess(CmiHandlerTable);
735   tab[n].hdlr = h;
736   tab[n].userPtr=userPtr;
737 }
738
739 #if CMI_LOCAL_GLOBAL_AVAILABLE /*Leave room for local and global handlers*/
740 #  define DIST_BETWEEN_HANDLERS 3
741 #else /*No local or global handlers; ordinary handlers are back-to-back*/
742 #  define DIST_BETWEEN_HANDLERS 1
743 #endif
744
745 int CmiRegisterHandler(CmiHandler h)
746 {
747   int Count = CpvAccess(CmiHandlerCount);
748   CmiNumberHandler(Count, h);
749   CpvAccess(CmiHandlerCount) = Count+DIST_BETWEEN_HANDLERS;
750   return Count;
751 }
752 int CmiRegisterHandlerEx(CmiHandlerEx h,void *userPtr)
753 {
754   int Count = CpvAccess(CmiHandlerCount);
755   CmiNumberHandlerEx(Count, h, userPtr);
756   CpvAccess(CmiHandlerCount) = Count+DIST_BETWEEN_HANDLERS;
757   return Count;
758 }
759
760 #if CMI_LOCAL_GLOBAL_AVAILABLE
761 int CmiRegisterHandlerLocal(h)
762 CmiHandler h;
763 {
764   int Local = CpvAccess(CmiHandlerLocal);
765   CmiNumberHandler(Local, h);
766   CpvAccess(CmiHandlerLocal) = Local+3;
767   return Local;
768 }
769
770 int CmiRegisterHandlerGlobal(h)
771 CmiHandler h;
772 {
773   int Global = CpvAccess(CmiHandlerGlobal);
774   if (CmiMyPe()!=0) 
775     CmiError("CmiRegisterHandlerGlobal must only be called on PE 0.\n");
776   CmiNumberHandler(Global, h);
777   CpvAccess(CmiHandlerGlobal) = Global+3;
778   return Global;
779 }
780 #endif
781
782 static void _cmiZeroHandler(void *msg) {
783         CmiAbort("Converse zero handler executed-- was a message corrupted?\n");
784 }
785
786 static void CmiHandlerInit()
787 {
788   CpvInitialize(CmiHandlerInfo *, CmiHandlerTable);
789   CpvInitialize(int         , CmiHandlerCount);
790   CpvInitialize(int         , CmiHandlerLocal);
791   CpvInitialize(int         , CmiHandlerGlobal);
792   CpvInitialize(int         , CmiHandlerMax);
793   CpvAccess(CmiHandlerCount)  = 0;
794   CpvAccess(CmiHandlerLocal)  = 1;
795   CpvAccess(CmiHandlerGlobal) = 2;
796   CpvAccess(CmiHandlerMax) = 0; /* Table will be extended on the first registration*/
797   CpvAccess(CmiHandlerTable) = NULL;
798   CmiRegisterHandler((CmiHandler)_cmiZeroHandler);
799 }
800
801
802 /******************************************************************************
803  *
804  * CmiTimer
805  *
806  * Here are two possible implementations of CmiTimer.  Some machines don't
807  * select either, and define the timer in machine.c instead.
808  *
809  *****************************************************************************/
810
811 #if CMK_HAS_ASCTIME
812
813 char *CmiPrintDate()
814 {
815   struct tm *local;
816   time_t t;
817
818   t = time(NULL);
819   local = localtime(&t);
820   return asctime(local);
821 }
822
823 #else
824
825 char *CmiPrintDate()
826 {
827   return "N/A";
828 }
829
830 #endif
831
832 static int _absoluteTime = 0;
833
834 #if CMK_TIMER_USE_TIMES
835
836 CpvStaticDeclare(double, clocktick);
837 CpvStaticDeclare(int,inittime_wallclock);
838 CpvStaticDeclare(int,inittime_virtual);
839
840 int CmiTimerIsSynchronized()
841 {
842   return 0;
843 }
844
845 int CmiTimerAbsolute()
846 {
847   return 0;
848 }
849
850 double CmiStartTimer()
851 {
852   return 0.0;
853 }
854
855 double CmiInitTime()
856 {
857   return CpvAccess(inittime_wallclock);
858 }
859
860 void CmiTimerInit(char **argv)
861 {
862   struct tms temp;
863   CpvInitialize(double, clocktick);
864   CpvInitialize(int, inittime_wallclock);
865   CpvInitialize(int, inittime_virtual);
866   CpvAccess(inittime_wallclock) = times(&temp);
867   CpvAccess(inittime_virtual) = temp.tms_utime + temp.tms_stime;
868   CpvAccess(clocktick) = 1.0 / (sysconf(_SC_CLK_TCK));
869 }
870
871 double CmiWallTimer()
872 {
873   struct tms temp;
874   double currenttime;
875   int now;
876
877   now = times(&temp);
878   currenttime = (now - CpvAccess(inittime_wallclock)) * CpvAccess(clocktick);
879   return (currenttime);
880 }
881
882 double CmiCpuTimer()
883 {
884   struct tms temp;
885   double currenttime;
886   int now;
887
888   times(&temp);
889   now = temp.tms_stime + temp.tms_utime;
890   currenttime = (now - CpvAccess(inittime_virtual)) * CpvAccess(clocktick);
891   return (currenttime);
892 }
893
894 double CmiTimer()
895 {
896   return CmiCpuTimer();
897 }
898
899 #endif
900
901 #if CMK_TIMER_USE_GETRUSAGE
902
903 #if CMK_SMP
904 # if CMK_HAS_RUSAGE_THREAD
905 #define RUSAGE_WHO        1   /* RUSAGE_THREAD, only in latest Linux kernels */
906 #else
907 #undef RUSAGE_WHO
908 #endif
909 #else
910 #define RUSAGE_WHO        0
911 #endif
912
913 static double inittime_wallclock;
914 CpvStaticDeclare(double, inittime_virtual);
915
916 int CmiTimerIsSynchronized()
917 {
918   return 0;
919 }
920
921 int CmiTimerAbsolute()
922 {
923   return _absoluteTime;
924 }
925
926 double CmiStartTimer()
927 {
928   return 0.0;
929 }
930
931 double CmiInitTime()
932 {
933   return inittime_wallclock;
934 }
935
936 void CmiTimerInit(char **argv)
937 {
938   struct timeval tv;
939   struct rusage ru;
940   CpvInitialize(double, inittime_virtual);
941
942   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
943
944 #if !CMK_MEM_CHECKPOINT && !_FAULT_MLOG_ && !_FAULT_CAUSAL_
945   /* try to synchronize calling barrier */
946   CmiBarrier();
947   CmiBarrier();
948   CmiBarrier();
949 #endif
950
951   gettimeofday(&tv,0);
952   inittime_wallclock = (tv.tv_sec * 1.0) + (tv.tv_usec*0.000001);
953 #ifndef RUSAGE_WHO
954   CpvAccess(inittime_virtual) = inittime_wallclock;
955 #else
956   getrusage(RUSAGE_WHO, &ru); 
957   CpvAccess(inittime_virtual) =
958     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
959     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
960 #endif
961
962 #if ! CMK_MEM_CHECKPOINT && !_FAULT_MLOG_ && !_FAULT_CAUSAL_
963   CmiBarrier();
964 /*  CmiBarrierZero(); */
965 #endif
966 }
967
968 double CmiCpuTimer()
969 {
970 #ifndef RUSAGE_WHO
971   return CmiWallTimer();
972 #else
973   struct rusage ru;
974   double currenttime;
975
976   getrusage(RUSAGE_WHO, &ru);
977   currenttime =
978     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
979     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
980   
981   return currenttime - CpvAccess(inittime_virtual);
982 #endif
983 }
984
985 static double lastT = -1.0;
986
987 double CmiWallTimer()
988 {
989   struct timeval tv;
990   double currenttime;
991
992   gettimeofday(&tv,0);
993   currenttime = (tv.tv_sec * 1.0) + (tv.tv_usec * 0.000001);
994 #if CMK_ERROR_CHECKING
995   if (lastT > 0.0 && currenttime < lastT) {
996     currenttime = lastT;
997   }
998   lastT = currenttime;
999 #endif
1000   return _absoluteTime?currenttime:currenttime - inittime_wallclock;
1001 }
1002
1003 double CmiTimer()
1004 {
1005   return CmiCpuTimer();
1006 }
1007
1008 #endif
1009
1010 #if CMK_TIMER_USE_RDTSC
1011
1012 static double readMHz(void)
1013 {
1014   double x;
1015   char str[1000];
1016   char buf[100];
1017   FILE *fp;
1018   CmiLock(_smp_mutex);
1019   fp = fopen("/proc/cpuinfo", "r");
1020   if (fp != NULL)
1021   while(fgets(str, 1000, fp)!=0) {
1022     if(sscanf(str, "cpu MHz%[^:]",buf)==1)
1023     {
1024       char *s = strchr(str, ':'); s=s+1;
1025       sscanf(s, "%lf", &x);
1026       fclose(fp);
1027       CmiUnlock(_smp_mutex);
1028       return x;
1029     }
1030   }
1031   CmiUnlock(_smp_mutex);
1032   CmiAbort("Cannot read CPU MHz from /proc/cpuinfo file.");
1033   return 0.0;
1034 }
1035
1036 double _cpu_speed_factor;
1037 CpvStaticDeclare(double, inittime_virtual);
1038 CpvStaticDeclare(double, inittime_walltime);
1039
1040 double  CmiStartTimer(void)
1041 {
1042   return CpvAccess(inittime_walltime);
1043 }
1044
1045 double CmiInitTime()
1046 {
1047   return CpvAccess(inittime_walltime);
1048 }
1049
1050 void CmiTimerInit(char **argv)
1051 {
1052   struct rusage ru;
1053
1054   CmiBarrier();
1055   CmiBarrier();
1056
1057   _cpu_speed_factor = 1.0/(readMHz()*1.0e6); 
1058   rdtsc(); rdtsc(); rdtsc(); rdtsc(); rdtsc();
1059   CpvInitialize(double, inittime_walltime);
1060   CpvAccess(inittime_walltime) = CmiWallTimer();
1061   CpvInitialize(double, inittime_virtual);
1062   getrusage(0, &ru); 
1063   CpvAccess(inittime_virtual) =
1064     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1065     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1066
1067   CmiBarrierZero();
1068 }
1069
1070 double CmiCpuTimer()
1071 {
1072   struct rusage ru;
1073   double currenttime;
1074
1075   getrusage(0, &ru);
1076   currenttime =
1077     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1078     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1079   return currenttime - CpvAccess(inittime_virtual);
1080 }
1081
1082 #endif
1083
1084 #if CMK_BLUEGENEL || CMK_BLUEGENEP
1085 #include "dcopy.h"
1086 #endif
1087
1088 #if CMK_TIMER_USE_BLUEGENEL
1089
1090 #include "rts.h"
1091
1092 #if 0 
1093 #define SPRN_TBRL 0x10C  /* Time Base Read Lower Register (user & sup R/O) */
1094 #define SPRN_TBRU 0x10D  /* Time Base Read Upper Register (user & sup R/O) */
1095 #define SPRN_PIR  0x11E  /* CPU id */
1096
1097 static inline unsigned long long BGLTimebase(void)
1098 {
1099   unsigned volatile u1, u2, lo;
1100   union
1101   {
1102     struct { unsigned hi, lo; } w;
1103     unsigned long long d;
1104   } result;
1105                                                                                 
1106   do {
1107     asm volatile ("mfspr %0,%1" : "=r" (u1) : "i" (SPRN_TBRU));
1108     asm volatile ("mfspr %0,%1" : "=r" (lo) : "i" (SPRN_TBRL));
1109     asm volatile ("mfspr %0,%1" : "=r" (u2) : "i" (SPRN_TBRU));
1110   } while (u1!=u2);
1111                                                                                 
1112   result.w.lo = lo;
1113   result.w.hi = u2;
1114   return result.d;
1115 }
1116 #endif
1117
1118 static unsigned long long inittime_wallclock = 0;
1119 CpvStaticDeclare(double, clocktick);
1120
1121 int CmiTimerIsSynchronized()
1122 {
1123   return 0;
1124 }
1125
1126 int CmiTimerAbsolute()
1127 {
1128   return 1;
1129 }
1130
1131 double CmiStartTimer()
1132 {
1133   return 0.0;
1134 }
1135
1136 double CmiInitTime()
1137 {
1138   return inittime_wallclock;
1139 }
1140
1141 void CmiTimerInit(char **argv)
1142 {
1143   BGLPersonality dst;
1144   CpvInitialize(double, clocktick);
1145   int size = sizeof(BGLPersonality);
1146   rts_get_personality(&dst, size);
1147   CpvAccess(clocktick) = 1.0 / dst.clockHz;
1148
1149   /* try to synchronize calling barrier */
1150   CmiBarrier();
1151   CmiBarrier();
1152   CmiBarrier();
1153
1154   /* inittime_wallclock = rts_get_timebase(); */
1155   inittime_wallclock = 0.0;    /* use bgl absolute time */
1156 }
1157
1158 double CmiWallTimer()
1159 {
1160   unsigned long long currenttime;
1161   currenttime = rts_get_timebase();
1162   //return CpvAccess(clocktick)*(currenttime-inittime_wallclock);
1163   return CpvAccess(clocktick)*(currenttime);
1164 }
1165
1166 double CmiCpuTimer()
1167 {
1168   return CmiWallTimer();
1169 }
1170
1171 double CmiTimer()
1172 {
1173   return CmiWallTimer();
1174 }
1175
1176 #endif
1177
1178 #if CMK_TIMER_USE_BLUEGENEP  /* This module just compiles with GCC charm. */
1179
1180 int CmiTimerAbsolute()
1181 {
1182   return 0;
1183 }
1184
1185 double CmiStartTimer()
1186 {
1187   return 0.0;
1188 }
1189
1190 double CmiInitTime()
1191 {
1192   return 0.0;
1193 }
1194
1195 void CmiTimerInit(char **argv) {}
1196
1197 #include "dcmf.h"
1198
1199 double CmiWallTimer () {
1200   return DCMF_Timer();
1201 }
1202
1203 double CmiCpuTimer()
1204 {
1205   return CmiWallTimer();
1206 }
1207
1208 double CmiTimer()
1209 {
1210   return CmiWallTimer();
1211 }
1212 #endif
1213
1214
1215 #if CMK_TIMER_USE_BLUEGENEQ  /* This module just compiles with GCC charm. */
1216
1217 CpvStaticDeclare(unsigned long, inittime);
1218 CpvStaticDeclare(double, clocktick);
1219
1220 int CmiTimerIsSynchronized()
1221 {
1222   return 1;
1223 }
1224
1225 int CmiTimerAbsolute()
1226 {
1227   return 0;
1228 }
1229
1230 #include "hwi/include/bqc/A2_inlines.h"
1231 #include "spi/include/kernel/process.h"
1232
1233 double CmiStartTimer()
1234 {
1235   return 0.0;
1236 }
1237
1238 double CmiInitTime()
1239 {
1240   return CpvAccess(inittime);
1241 }
1242
1243 void CmiTimerInit(char **argv)
1244 {
1245   CpvInitialize(double, clocktick);
1246   CpvInitialize(unsigned long, inittime);
1247
1248   Personality_t  pers;
1249   Kernel_GetPersonality(&pers, sizeof(pers));
1250   uint32_t clockMhz = pers.Kernel_Config.FreqMHz;
1251   CpvAccess(clocktick) = 1.0 / (clockMhz * 1e6); 
1252
1253   /*fprintf(stderr, "Blue Gene/Q running at clock speed of %d Mhz\n", clockMhz);*/
1254
1255   /* try to synchronize calling barrier */
1256 #if !CMK_MEM_CHECKPOINT && !_FAULT_MLOG_ && !_FAULT_CAUSAL_
1257   CmiBarrier();
1258   CmiBarrier();
1259   CmiBarrier();
1260 #endif
1261   CpvAccess(inittime) = GetTimeBase (); 
1262 }
1263
1264 double CmiWallTimer()
1265 {
1266   unsigned long long currenttime;
1267   currenttime = GetTimeBase();
1268   return CpvAccess(clocktick)*(currenttime-CpvAccess(inittime));
1269 }
1270
1271 double CmiCpuTimer()
1272 {
1273   return CmiWallTimer();
1274 }
1275
1276 double CmiTimer()
1277 {
1278   return CmiWallTimer();
1279 }
1280
1281 #endif
1282
1283
1284 #if CMK_TIMER_USE_WIN32API
1285
1286 CpvStaticDeclare(double, inittime_wallclock);
1287 CpvStaticDeclare(double, inittime_virtual);
1288
1289 double CmiStartTimer()
1290 {
1291   return 0.0;
1292 }
1293
1294 int CmiTimerAbsolute()
1295 {
1296   return 0;
1297 }
1298
1299 double CmiInitTime()
1300 {
1301   return CpvAccess(inittime_wallclock);
1302 }
1303
1304 void CmiTimerInit(char **argv)
1305 {
1306 #ifdef __CYGWIN__
1307         struct timeb tv;
1308 #else
1309         struct _timeb tv;
1310 #endif
1311         clock_t       ru;
1312
1313         CpvInitialize(double, inittime_wallclock);
1314         CpvInitialize(double, inittime_virtual);
1315         _ftime(&tv);
1316         CpvAccess(inittime_wallclock) = tv.time*1.0 + tv.millitm*0.001;
1317         ru = clock();
1318         CpvAccess(inittime_virtual) = ((double) ru)/CLOCKS_PER_SEC;
1319 }
1320
1321 double CmiCpuTimer()
1322 {
1323         clock_t ru;
1324         double currenttime;
1325
1326         ru = clock();
1327         currenttime = (double) ru/CLOCKS_PER_SEC;
1328
1329         return currenttime - CpvAccess(inittime_virtual);
1330 }
1331
1332 double CmiWallTimer()
1333 {
1334 #ifdef __CYGWIN__
1335         struct timeb tv;
1336 #else
1337         struct _timeb tv;
1338 #endif
1339         double currenttime;
1340
1341         _ftime(&tv);
1342         currenttime = tv.time*1.0 + tv.millitm*0.001;
1343
1344         return currenttime - CpvAccess(inittime_wallclock);
1345 }
1346         
1347
1348 double CmiTimer()
1349 {
1350         return CmiCpuTimer();
1351 }
1352
1353 #endif
1354
1355 #if CMK_TIMER_USE_RTC
1356
1357 #if __crayx1
1358  /* For _rtc() on Cray X1 */
1359 #include <intrinsics.h>
1360 #endif
1361
1362 static double clocktick;
1363 CpvStaticDeclare(long long, inittime_wallclock);
1364
1365 double CmiStartTimer()
1366 {
1367   return 0.0;
1368 }
1369
1370 double CmiInitTime()
1371 {
1372   return CpvAccess(inittime_wallclock);
1373 }
1374
1375 void CmiTimerInit(char **argv)
1376 {
1377   CpvInitialize(long long, inittime_wallclock);
1378   CpvAccess(inittime_wallclock) = _rtc();
1379   clocktick = 1.0 / (double)(sysconf(_SC_SV2_USER_TIME_RATE));
1380 }
1381
1382 int CmiTimerAbsolute()
1383 {
1384   return 0;
1385 }
1386
1387 double CmiWallTimer()
1388 {
1389   long long now;
1390
1391   now = _rtc();
1392   return (clocktick * (now - CpvAccess(inittime_wallclock)));
1393 }
1394
1395 double CmiCpuTimer()
1396 {
1397   return CmiWallTimer();
1398 }
1399
1400 double CmiTimer()
1401 {
1402   return CmiCpuTimer();
1403 }
1404
1405 #endif
1406
1407 #if CMK_TIMER_USE_AIX_READ_TIME
1408
1409 #include <sys/time.h>
1410
1411 static timebasestruct_t inittime_wallclock;
1412 static double clocktick;
1413 CpvStaticDeclare(double, inittime_virtual);
1414
1415 double CmiStartTimer()
1416 {
1417   return 0.0;
1418 }
1419
1420 double CmiInitTime()
1421 {
1422   return inittime_wallclock;
1423 }
1424
1425 void CmiTimerInit(char **argv)
1426 {
1427   struct rusage ru;
1428
1429   if (CmiMyRank() == 0) {
1430     read_wall_time(&inittime_wallclock, TIMEBASE_SZ);
1431     time_base_to_time(&inittime_wallclock, TIMEBASE_SZ);
1432   }
1433
1434   CpvInitialize(double, inittime_virtual);
1435   getrusage(0, &ru);
1436   CpvAccess(inittime_virtual) =
1437     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1438     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1439 }
1440
1441 int CmiTimerAbsolute()
1442 {
1443   return 0;
1444 }
1445
1446 double CmiWallTimer()
1447 {
1448   int secs, n_secs;
1449   double curt;
1450   timebasestruct_t now;
1451   read_wall_time(&now, TIMEBASE_SZ);
1452   time_base_to_time(&now, TIMEBASE_SZ);
1453
1454   secs = now.tb_high - inittime_wallclock.tb_high;
1455   n_secs = now.tb_low - inittime_wallclock.tb_low;
1456   if (n_secs < 0)  {
1457     secs--;
1458     n_secs += 1000000000;
1459   }
1460   curt = secs*1.0 + n_secs*1e-9;
1461   return curt;
1462 }
1463
1464 double CmiCpuTimer()
1465 {
1466   struct rusage ru;
1467   double currenttime;
1468
1469   getrusage(0, &ru);
1470   currenttime =
1471     (ru.ru_utime.tv_sec * 1.0)+(ru.ru_utime.tv_usec * 0.000001) +
1472     (ru.ru_stime.tv_sec * 1.0)+(ru.ru_stime.tv_usec * 0.000001);
1473   return currenttime - CpvAccess(inittime_virtual);
1474 }
1475
1476 double CmiTimer()
1477 {
1478   return CmiWallTimer();
1479 }
1480
1481 #endif
1482
1483 #ifndef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
1484 /** Return 1 if our outgoing message queue 
1485    for this node is longer than this many bytes. */
1486 int CmiLongSendQueue(int forNode,int longerThanBytes) {
1487   return 0;
1488 }
1489 #endif
1490
1491 #if CMK_SIGNAL_USE_SIGACTION
1492 #include <signal.h>
1493 void CmiSignal(int sig1, int sig2, int sig3, void (*handler)())
1494 {
1495   struct sigaction in, out ;
1496   in.sa_handler = handler;
1497   sigemptyset(&in.sa_mask);
1498   if (sig1) sigaddset(&in.sa_mask, sig1);
1499   if (sig2) sigaddset(&in.sa_mask, sig2);
1500   if (sig3) sigaddset(&in.sa_mask, sig3);
1501   in.sa_flags = 0;
1502   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
1503   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
1504   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
1505 }
1506 #endif
1507
1508 #if CMK_SIGNAL_USE_SIGACTION_WITH_RESTART
1509 #include <signal.h>
1510 void CmiSignal(sig1, sig2, sig3, handler)
1511 int sig1, sig2, sig3;
1512 void (*handler)();
1513 {
1514   struct sigaction in, out ;
1515   in.sa_handler = handler;
1516   sigemptyset(&in.sa_mask);
1517   if (sig1) sigaddset(&in.sa_mask, sig1);
1518   if (sig2) sigaddset(&in.sa_mask, sig2);
1519   if (sig3) sigaddset(&in.sa_mask, sig3);
1520   in.sa_flags = SA_RESTART;
1521   if (sig1) if (sigaction(sig1, &in, &out)<0) exit(1);
1522   if (sig2) if (sigaction(sig2, &in, &out)<0) exit(1);
1523   if (sig3) if (sigaction(sig3, &in, &out)<0) exit(1);
1524 }
1525 #endif
1526
1527 /** 
1528  *  @addtogroup ConverseScheduler
1529  *  @{
1530  */
1531
1532 /*****************************************************************************
1533  * 
1534  * The following is the CsdScheduler function.  A common
1535  * implementation is provided below.  The machine layer can provide an
1536  * alternate implementation if it so desires.
1537  *
1538  * void CmiDeliversInit()
1539  *
1540  *      - CmiInit promises to call this before calling CmiDeliverMsgs
1541  *        or any of the other functions in this section.
1542  *
1543  * int CmiDeliverMsgs(int maxmsgs)
1544  *
1545  *      - CmiDeliverMsgs will retrieve up to maxmsgs that were transmitted
1546  *        with the Cmi, and will invoke their handlers.  It does not wait
1547  *        if no message is unavailable.  Instead, it returns the quantity
1548  *        (maxmsgs-delivered), where delivered is the number of messages it
1549  *        delivered.
1550  *
1551  * void CmiDeliverSpecificMsg(int handlerno)
1552  *
1553  *      - Waits for a message with the specified handler to show up, then
1554  *        invokes the message's handler.  Note that unlike CmiDeliverMsgs,
1555  *        This function _does_ wait.
1556  *
1557  * For this common implementation to work, the machine layer must provide the
1558  * following:
1559  *
1560  * void *CmiGetNonLocal()
1561  *
1562  *      - returns a message just retrieved from some other PE, not from
1563  *        local.  If no such message exists, returns 0.
1564  *
1565  * CpvExtern(CdsFifo, CmiLocalQueue);
1566  *
1567  *      - a FIFO queue containing all messages from the local processor.
1568  *
1569  *****************************************************************************/
1570
1571 void CsdBeginIdle(void)
1572 {
1573   CcdCallBacks();
1574 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
1575   _LOG_E_PROC_IDLE();   /* projector */
1576 #endif
1577
1578   CpvAccess(cmiMyPeIdle) = 1;
1579
1580   CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE) ;
1581 }
1582
1583 void CsdStillIdle(void)
1584 {
1585   CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE);
1586 }
1587
1588 void CsdEndIdle(void)
1589 {
1590 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
1591   _LOG_E_PROC_BUSY();   /* projector */
1592 #endif
1593
1594   CpvAccess(cmiMyPeIdle) = 0;
1595
1596   CcdRaiseCondition(CcdPROCESSOR_BEGIN_BUSY) ;
1597 }
1598
1599 extern int _exitHandlerIdx;
1600
1601 /** Takes a message and calls its corresponding handler. */
1602 void CmiHandleMessage(void *msg)
1603 {
1604 /* this is wrong because it counts the Charm++ messages in sched queue
1605         CpvAccess(cQdState)->mProcessed++;
1606 */
1607         CmiHandlerInfo *h;
1608 #if CMK_TRACE_ENABLED && CMK_PROJECTOR
1609         CmiUInt2 handler=CmiGetHandler(msg); /* Save handler for use after msg is gone */
1610         _LOG_E_HANDLER_BEGIN(handler); /* projector */
1611         /* setMemoryStatus(1) */ /* charmdebug */
1612 #endif
1613
1614 /*
1615         FAULT_EVAC
1616 */
1617 /*      if((!CpvAccess(_validProcessors)[CmiMyPe()]) && handler != _exitHandlerIdx){
1618                 return;
1619         }*/
1620         
1621         MESSAGE_PHASE_CHECK(msg)
1622
1623         h=&CmiGetHandlerInfo(msg);
1624         (h->hdlr)(msg,h->userPtr);
1625 #if CMK_TRACE_ENABLED
1626         /* setMemoryStatus(0) */ /* charmdebug */
1627         //_LOG_E_HANDLER_END(handler);  /* projector */
1628 #endif
1629 }
1630
1631 #if CMK_CMIDELIVERS_USE_COMMON_CODE
1632
1633 void CmiDeliversInit()
1634 {
1635 }
1636
1637 int CmiDeliverMsgs(int maxmsgs)
1638 {
1639   return CsdScheduler(maxmsgs);
1640 }
1641
1642 #if CMK_OBJECT_QUEUE_AVAILABLE
1643 CpvDeclare(void *, CsdObjQueue);
1644 #endif
1645
1646 void CsdSchedulerState_new(CsdSchedulerState_t *s)
1647 {
1648 #if CMK_OBJECT_QUEUE_AVAILABLE
1649         s->objQ=CpvAccess(CsdObjQueue);
1650 #endif
1651         s->localQ=CpvAccess(CmiLocalQueue);
1652         s->schedQ=CpvAccess(CsdSchedQueue);
1653         s->localCounter=&(CpvAccess(CsdLocalCounter));
1654 #if CMK_NODE_QUEUE_AVAILABLE
1655         s->nodeQ=CsvAccess(CsdNodeQueue);
1656         s->nodeLock=CsvAccess(CsdNodeQueueLock);
1657 #endif
1658 #if CMK_GRID_QUEUE_AVAILABLE
1659         s->gridQ=CpvAccess(CsdGridQueue);
1660 #endif
1661 }
1662
1663
1664 /** Dequeue and return the next message from the unprocessed message queues.
1665  *
1666  * This function encapsulates the multiple queues that exist for holding unprocessed
1667  * messages and the rules for the order in which to check them. There are five (5)
1668  * different Qs that converse uses to store and retrieve unprocessed messages. These
1669  * are:
1670  *     Q Purpose                  Type      internal DeQ logic
1671  * -----------------------------------------------------------
1672  * - PE offnode                   pcQ             FIFO
1673  * - PE onnode                    CkQ             FIFO
1674  * - Node offnode                 pcQ             FIFO
1675  * - Node onnode                  prioQ           prio-based
1676  * - Scheduler                    prioQ           prio-based
1677  *
1678  * The PE queues hold messages that are destined for a specific PE. There is one such
1679  * queue for every PE within a charm node. The node queues hold messages that are
1680  * destined to that node. There is only one of each node queue within a charm node.
1681  * Finally there is also a charm++ message queue for each PE.
1682  *
1683  * The offnode queues are meant for holding messages that arrive from outside the
1684  * node. The onnode queues hold messages that are generated within the same charm
1685  * node.
1686  *
1687  * The PE and node level offnode queues are accessed via functions CmiGetNonLocal()
1688  * and CmiGetNonLocalNodeQ(). These are implemented separately by each machine layer
1689  * and hide the implementation specifics for each layer.
1690  *
1691  * The PE onnode queue is implemented as a FIFO CkQ and is initialized via a call to
1692  * CdsFifo_Create(). The node local queue and the scheduler queue are both priority
1693  * queues. They are initialized via calls to CqsCreate() which gives each of them
1694  * three separate internal queues for different priority ranges (-ve, 0 and +ve).
1695  * Access to these queues is via pointers stored in the struct CsdSchedulerState that
1696  * is passed into this function.
1697  *
1698  * The order in which these queues are checked is described below. The function
1699  * proceeds to the next queue in the list only if it does not find any messages in
1700  * the current queue. The first message that is found is returned, terminating the
1701  * call.
1702  * (1) offnode queue for this PE
1703  * (2) onnode queue for this PE
1704  * (3) offnode queue for this node
1705  * (4) highest priority msg from onnode queue or scheduler queue
1706  *
1707  * @note: Across most (all?) machine layers, the two GetNonLocal functions simply
1708  * access (after observing adequate locking rigor) structs representing the scheduler
1709  * state, to dequeue from the queues stored within them. The structs (CmiStateStruct
1710  * and CmiNodeStateStruct) implement these queues as \ref Machine "pc (producer-consumer)
1711  * queues". The functions also perform other necessary actions like PumpMsgs() etc.
1712  *
1713  */
1714 void *CsdNextMessage(CsdSchedulerState_t *s) {
1715         void *msg;
1716         if((*(s->localCounter))-- >0)
1717           {
1718               /* This avoids a race condition with migration detected by megatest*/
1719               msg=CdsFifo_Dequeue(s->localQ);
1720               if (msg!=NULL)
1721                 {
1722                   CpvAccess(cQdState)->mProcessed++;
1723                   return msg;       
1724                 }
1725               CqsDequeue(s->schedQ,(void **)&msg);
1726               if (msg!=NULL) return msg;
1727           }
1728         
1729         *(s->localCounter)=CsdLocalMax;
1730         if ( NULL!=(msg=CmiGetNonLocal()) || 
1731              NULL!=(msg=CdsFifo_Dequeue(s->localQ)) ) {
1732             CpvAccess(cQdState)->mProcessed++;
1733             return msg;
1734         }
1735 #if CMK_GRID_QUEUE_AVAILABLE
1736         /*#warning "CsdNextMessage: CMK_GRID_QUEUE_AVAILABLE" */
1737         CqsDequeue (s->gridQ, (void **) &msg);
1738         if (msg != NULL) {
1739           return (msg);
1740         }
1741 #endif
1742 #if CMK_NODE_QUEUE_AVAILABLE
1743         /*#warning "CsdNextMessage: CMK_NODE_QUEUE_AVAILABLE" */
1744         if (NULL!=(msg=CmiGetNonLocalNodeQ())) return msg;
1745         if (!CqsEmpty(s->nodeQ)
1746          && !CqsPrioGT(CqsGetPriority(s->nodeQ),
1747                        CqsGetPriority(s->schedQ))) {
1748           CmiLock(s->nodeLock);
1749           CqsDequeue(s->nodeQ,(void **)&msg);
1750           CmiUnlock(s->nodeLock);
1751           if (msg!=NULL) return msg;
1752         }
1753 #endif
1754 #if CMK_OBJECT_QUEUE_AVAILABLE
1755         /*#warning "CsdNextMessage: CMK_OBJECT_QUEUE_AVAILABLE"   */
1756         if (NULL!=(msg=CdsFifo_Dequeue(s->objQ))) {
1757           return msg;
1758         }
1759 #endif
1760         if(!CsdLocalMax) {
1761           CqsDequeue(s->schedQ,(void **)&msg);
1762           if (msg!=NULL) return msg;        
1763         }
1764         return NULL;
1765 }
1766
1767
1768 void *CsdNextLocalNodeMessage(CsdSchedulerState_t *s) {
1769         void *msg;
1770 #if CMK_NODE_QUEUE_AVAILABLE
1771         /*#warning "CsdNextMessage: CMK_NODE_QUEUE_AVAILABLE" */
1772         /*if (NULL!=(msg=CmiGetNonLocalNodeQ())) return msg;*/
1773         if (!CqsEmpty(s->nodeQ))
1774         {
1775           CmiLock(s->nodeLock);
1776           CqsDequeue(s->nodeQ,(void **)&msg);
1777           CmiUnlock(s->nodeLock);
1778           if (msg!=NULL) return msg;
1779         }
1780 #endif
1781         return NULL;
1782
1783 }
1784
1785 int CsdScheduler(int maxmsgs)
1786 {
1787         if (maxmsgs<0) CsdScheduleForever();    
1788         else if (maxmsgs==0)
1789                 CsdSchedulePoll();
1790         else /*(maxmsgs>0)*/ 
1791                 return CsdScheduleCount(maxmsgs);
1792         return 0;
1793 }
1794
1795 /*Declare the standard scheduler housekeeping*/
1796 #define SCHEDULE_TOP \
1797       void *msg;\
1798       int *CsdStopFlag_ptr = &CpvAccess(CsdStopFlag); \
1799       int cycle = CpvAccess(CsdStopFlag); \
1800       CsdSchedulerState_t state;\
1801       CsdSchedulerState_new(&state);
1802
1803 /*A message is available-- process it*/
1804 #define SCHEDULE_MESSAGE \
1805       CmiHandleMessage(msg);\
1806       if (*CsdStopFlag_ptr != cycle) break;
1807
1808 /*No message available-- go (or remain) idle*/
1809 #define SCHEDULE_IDLE \
1810       if (!isIdle) {isIdle=1;CsdBeginIdle();}\
1811       else CsdStillIdle();\
1812       if (*CsdStopFlag_ptr != cycle) {\
1813         CsdEndIdle();\
1814         break;\
1815       }
1816
1817 /*
1818         EVAC
1819 */
1820 extern void CkClearAllArrayElements();
1821
1822
1823 extern void machine_OffloadAPIProgress();
1824
1825 /** The main scheduler loop that repeatedly executes messages from a queue, forever. */
1826 void CsdScheduleForever(void)
1827 {
1828   #if CMK_CELL
1829     #define CMK_CELL_PROGRESS_FREQ  96  /* (MSG-Q Entries x1.5) */
1830     int progressCount = CMK_CELL_PROGRESS_FREQ;
1831   #endif
1832
1833   #ifdef CMK_CUDA
1834     #define CMK_CUDA_PROGRESS_FREQ 50
1835     int cudaProgressCount = CMK_CUDA_PROGRESS_FREQ;
1836   #endif
1837
1838   int isIdle=0;
1839   SCHEDULE_TOP
1840   while (1) {
1841     /* The interoperation will cost this little overhead in scheduling */
1842     if(CharmLibInterOperate) {
1843       if(CpvAccess(charmLibExitFlag)) {
1844         CpvAccess(charmLibExitFlag) = 0;
1845         break;
1846       }
1847     }
1848     msg = CsdNextMessage(&state);
1849     if (msg!=NULL) { /*A message is available-- process it*/
1850       if (isIdle) {isIdle=0;CsdEndIdle();}
1851       SCHEDULE_MESSAGE
1852
1853       #if CMK_CELL
1854         if (progressCount <= 0) {
1855           /*OffloadAPIProgress();*/
1856           machine_OffloadAPIProgress();
1857           progressCount = CMK_CELL_PROGRESS_FREQ;
1858         }
1859         progressCount--;
1860       #endif
1861
1862       #ifdef CMK_CUDA
1863         if (cudaProgressCount == 0) {
1864           gpuProgressFn(); 
1865           cudaProgressCount = CMK_CUDA_PROGRESS_FREQ; 
1866         }
1867         cudaProgressCount--; 
1868       #endif
1869
1870     } else { /*No message available-- go (or remain) idle*/
1871       SCHEDULE_IDLE
1872
1873       #if CMK_CELL
1874         /*OffloadAPIProgress();*/
1875         machine_OffloadAPIProgress();
1876         progressCount = CMK_CELL_PROGRESS_FREQ;
1877       #endif
1878
1879       #ifdef CMK_CUDA
1880         gpuProgressFn(); 
1881         cudaProgressCount = CMK_CUDA_PROGRESS_FREQ;
1882       #endif
1883
1884     }
1885     CsdPeriodic();
1886   }
1887 }
1888 int CsdScheduleCount(int maxmsgs)
1889 {
1890   int isIdle=0;
1891   SCHEDULE_TOP
1892   while (1) {
1893     msg = CsdNextMessage(&state);
1894     if (msg!=NULL) { /*A message is available-- process it*/
1895       if (isIdle) {isIdle=0;CsdEndIdle();}
1896       maxmsgs--; 
1897       SCHEDULE_MESSAGE
1898       if (maxmsgs==0) break;
1899     } else { /*No message available-- go (or remain) idle*/
1900       SCHEDULE_IDLE
1901     }
1902     CsdPeriodic();
1903   }
1904   return maxmsgs;
1905 }
1906
1907 void CsdSchedulePoll(void)
1908 {
1909   SCHEDULE_TOP
1910   while (1)
1911   {
1912         CsdPeriodic();
1913         /*CmiMachineProgressImpl(); ??? */
1914         if (NULL!=(msg = CsdNextMessage(&state)))
1915         {
1916              SCHEDULE_MESSAGE 
1917         }
1918         else break;
1919   }
1920 }
1921
1922 void CsdScheduleNodePoll(void)
1923 {
1924   SCHEDULE_TOP
1925   while (1)
1926   {
1927         /*CsdPeriodic();*/
1928         /*CmiMachineProgressImpl(); ??? */
1929         if (NULL!=(msg = CsdNextLocalNodeMessage(&state)))
1930         {
1931              SCHEDULE_MESSAGE 
1932         }
1933         else break;
1934   }
1935 }
1936
1937 void CmiDeliverSpecificMsg(handler)
1938 int handler;
1939 {
1940   int *msg; int side;
1941   void *localqueue = CpvAccess(CmiLocalQueue);
1942  
1943   side = 0;
1944   while (1) {
1945     CsdPeriodic();
1946     side ^= 1;
1947     if (side) msg = CmiGetNonLocal();
1948     else      msg = CdsFifo_Dequeue(localqueue);
1949     if (msg) {
1950       if (CmiGetHandler(msg)==handler) {
1951         CpvAccess(cQdState)->mProcessed++;
1952         CmiHandleMessage(msg);
1953         return;
1954       } else {
1955         CdsFifo_Enqueue(localqueue, msg);
1956       }
1957     }
1958   }
1959 }
1960  
1961 #endif /* CMK_CMIDELIVERS_USE_COMMON_CODE */
1962
1963 /***************************************************************************
1964  *
1965  * Standin Schedulers.
1966  *
1967  * We use the following strategy to make sure somebody's always running
1968  * the scheduler (CsdScheduler).  Initially, we assume the main thread
1969  * is responsible for this.  If the main thread blocks, we create a
1970  * "standin scheduler" thread to replace it.  If the standin scheduler
1971  * blocks, we create another standin scheduler to replace that one,
1972  * ad infinitum.  Collectively, the main thread and all the standin
1973  * schedulers are called "scheduling threads".
1974  *
1975  * Suppose the main thread is blocked waiting for data, and a standin
1976  * scheduler is running instead.  Suppose, then, that the data shows
1977  * up and the main thread is CthAwakened.  This causes a token to be
1978  * pushed into the queue.  When the standin pulls the token from the
1979  * queue and handles it, the standin goes to sleep, and control shifts
1980  * back to the main thread.  In this way, unnecessary standins are put
1981  * back to sleep.  These sleeping standins are stored on the
1982  * CthSleepingStandins list.
1983  *
1984  ***************************************************************************/
1985
1986 CpvStaticDeclare(CthThread, CthMainThread);
1987 CpvStaticDeclare(CthThread, CthSchedulingThread);
1988 CpvStaticDeclare(CthThread, CthSleepingStandins);
1989 CpvDeclare(int      , CthResumeNormalThreadIdx);
1990 CpvStaticDeclare(int      , CthResumeSchedulingThreadIdx);
1991
1992
1993 void CthStandinCode()
1994 {
1995   while (1) CsdScheduler(0);
1996 }
1997
1998 /* this fix the function pointer for thread migration and pup */
1999 static CthThread CthSuspendNormalThread()
2000 {
2001   return CpvAccess(CthSchedulingThread);
2002 }
2003
2004 void CthEnqueueSchedulingThread(CthThreadToken *token, int, int, unsigned int*);
2005 CthThread CthSuspendSchedulingThread();
2006
2007 CthThread CthSuspendSchedulingThread()
2008 {
2009   CthThread succ = CpvAccess(CthSleepingStandins);
2010
2011   if (succ) {
2012     CpvAccess(CthSleepingStandins) = CthGetNext(succ);
2013   } else {
2014     succ = CthCreate(CthStandinCode, 0, 256000);
2015     CthSetStrategy(succ,
2016                    CthEnqueueSchedulingThread,
2017                    CthSuspendSchedulingThread);
2018   }
2019   
2020   CpvAccess(CthSchedulingThread) = succ;
2021   return succ;
2022 }
2023
2024 /* Notice: For changes to the following function, make sure the function CthResumeNormalThreadDebug is also kept updated. */
2025 void CthResumeNormalThread(CthThreadToken* token)
2026 {
2027   CthThread t = token->thread;
2028
2029   /* BIGSIM_OOC DEBUGGING
2030   CmiPrintf("Resume normal thread with token[%p] ==> thread[%p]\n", token, t);
2031   */
2032
2033   if(t == NULL){
2034     free(token);
2035     return;
2036   }
2037 #if CMK_TRACE_ENABLED
2038 #if ! CMK_TRACE_IN_CHARM
2039   if(CpvAccess(traceOn))
2040     CthTraceResume(t);
2041 /*    if(CpvAccess(_traceCoreOn)) 
2042                 resumeTraceCore();*/
2043 #endif
2044 #endif
2045   
2046   /* BIGSIM_OOC DEBUGGING
2047   CmiPrintf("In CthResumeNormalThread:   ");
2048   CthPrintThdMagic(t);
2049   */
2050
2051 //  if(CmiMyPartition()==0&& CmiMyPe()==1)
2052 //    CmiPrintf("[%d][%d]In CthResumeNormalThread:\n",CmiMyPartition(),CmiMyPe());
2053   fflush(stdout);
2054   CthResume(t);
2055 //  if(CmiMyPartition()==0&& CmiMyPe()==1)
2056 //    CmiPrintf("[%d][%d]In CthResumeNormalThread end:\n",CmiMyPartition(),CmiMyPe());
2057   fflush(stdout);
2058 }
2059
2060 void CthResumeSchedulingThread(CthThreadToken  *token)
2061 {
2062   CthThread t = token->thread;
2063   CthThread me = CthSelf();
2064   if (me == CpvAccess(CthMainThread)) {
2065     CthEnqueueSchedulingThread(CthGetToken(me),CQS_QUEUEING_FIFO, 0, 0);
2066   } else {
2067     CthSetNext(me, CpvAccess(CthSleepingStandins));
2068     CpvAccess(CthSleepingStandins) = me;
2069   }
2070   CpvAccess(CthSchedulingThread) = t;
2071 #if CMK_TRACE_ENABLED
2072 #if ! CMK_TRACE_IN_CHARM
2073   if(CpvAccess(traceOn))
2074     CthTraceResume(t);
2075 /*    if(CpvAccess(_traceCoreOn)) 
2076                 resumeTraceCore();*/
2077 #endif
2078 #endif
2079   CthResume(t);
2080 }
2081
2082 void CthEnqueueNormalThread(CthThreadToken* token, int s, 
2083                                    int pb,unsigned int *prio)
2084 {
2085   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2086   CsdEnqueueGeneral(token, s, pb, prio);
2087 }
2088
2089 void CthEnqueueSchedulingThread(CthThreadToken* token, int s, 
2090                                        int pb,unsigned int *prio)
2091 {
2092   CmiSetHandler(token, CpvAccess(CthResumeSchedulingThreadIdx));
2093   CsdEnqueueGeneral(token, s, pb, prio);
2094 }
2095
2096 void CthSetStrategyDefault(CthThread t)
2097 {
2098   CthSetStrategy(t,
2099                  CthEnqueueNormalThread,
2100                  CthSuspendNormalThread);
2101 }
2102
2103 void CthSchedInit()
2104 {
2105   CpvInitialize(CthThread, CthMainThread);
2106   CpvInitialize(CthThread, CthSchedulingThread);
2107   CpvInitialize(CthThread, CthSleepingStandins);
2108   CpvInitialize(int      , CthResumeNormalThreadIdx);
2109   CpvInitialize(int      , CthResumeSchedulingThreadIdx);
2110
2111   CpvAccess(CthMainThread) = CthSelf();
2112   CpvAccess(CthSchedulingThread) = CthSelf();
2113   CpvAccess(CthSleepingStandins) = 0;
2114   CpvAccess(CthResumeNormalThreadIdx) =
2115     CmiRegisterHandler((CmiHandler)CthResumeNormalThread);
2116   CpvAccess(CthResumeSchedulingThreadIdx) =
2117     CmiRegisterHandler((CmiHandler)CthResumeSchedulingThread);
2118   CthSetStrategy(CthSelf(),
2119                  CthEnqueueSchedulingThread,
2120                  CthSuspendSchedulingThread);
2121 }
2122
2123 void CsdInit(argv)
2124   char **argv;
2125 {
2126   CpvInitialize(void *, CsdSchedQueue);
2127   CpvInitialize(int,   CsdStopFlag);
2128   CpvInitialize(int,   CsdLocalCounter);
2129   if(!CmiGetArgIntDesc(argv,"+csdLocalMax",&CsdLocalMax,"Set the max number of local messages to process before forcing a check for remote messages."))
2130     {
2131       CsdLocalMax= CSD_LOCAL_MAX_DEFAULT;
2132     }
2133   CpvAccess(CsdLocalCounter) = CsdLocalMax;
2134   CpvAccess(CsdSchedQueue) = (void *)CqsCreate();
2135    #if CMK_USE_STL_MSGQ
2136    if (CmiMyPe() == 0) CmiPrintf("Charm++> Using STL-based msgQ:\n");
2137    #endif
2138
2139 #if CMK_OBJECT_QUEUE_AVAILABLE
2140   CpvInitialize(void *,CsdObjQueue);
2141   CpvAccess(CsdObjQueue) = CdsFifo_Create();
2142 #endif
2143
2144 #if CMK_NODE_QUEUE_AVAILABLE
2145   CsvInitialize(CmiLock, CsdNodeQueueLock);
2146   CsvInitialize(void *, CsdNodeQueue);
2147   if (CmiMyRank() ==0) {
2148         CsvAccess(CsdNodeQueueLock) = CmiCreateLock();
2149         CsvAccess(CsdNodeQueue) = (void *)CqsCreate();
2150   }
2151   CmiNodeAllBarrier();
2152 #endif
2153
2154 #if CMK_GRID_QUEUE_AVAILABLE
2155   CsvInitialize(void *, CsdGridQueue);
2156   CpvAccess(CsdGridQueue) = (void *)CqsCreate();
2157 #endif
2158
2159   CpvAccess(CsdStopFlag)  = 0;
2160 }
2161
2162
2163
2164 /** 
2165  *  @}
2166  */
2167
2168
2169 /*****************************************************************************
2170  *
2171  * Vector Send
2172  *
2173  * The last parameter "system" is by default at zero, in which case the normal
2174  * messages are sent. If it is set to 1, the CmiChunkHeader prepended to every
2175  * CmiAllocced message will also be sent (except for the first one). Useful for
2176  * AllToAll communication, and other system features. If system is 1, also all
2177  * the messages will be padded to 8 bytes. Thus, the caller must be aware of
2178  * that.
2179  *
2180  ****************************************************************************/
2181
2182 #if CMK_VECTOR_SEND_USES_COMMON_CODE
2183
2184 void CmiSyncVectorSend(int destPE, int n, int *sizes, char **msgs) {
2185   int total;
2186   char *mesg;
2187 #if CMK_USE_IBVERBS
2188   VECTOR_COMPACT(total, mesg, n, sizes, msgs,sizeof(infiCmiChunkHeader));
2189 #else
2190   VECTOR_COMPACT(total, mesg, n, sizes, msgs,sizeof(CmiChunkHeader));
2191 #endif  
2192   CmiSyncSendAndFree(destPE, total, mesg);
2193 }
2194
2195 CmiCommHandle CmiASyncVectorSend(int destPE, int n, int *sizes, char **msgs) {
2196   CmiSyncVectorSend(destPE, n, sizes, msgs);
2197   return NULL;
2198 }
2199
2200 void CmiSyncVectorSendAndFree(int destPE, int n, int *sizes, char **msgs) {
2201   int i;
2202   CmiSyncVectorSend(destPE, n, sizes, msgs);
2203   for(i=0;i<n;i++) CmiFree(msgs[i]);
2204   CmiFree(sizes);
2205   CmiFree(msgs);
2206 }
2207
2208 #endif
2209
2210 /*****************************************************************************
2211  *
2212  * Reduction management
2213  *
2214  * Only one reduction can be active at a single time in the program.
2215  * Moreover, since every call is supposed to pass in the same arguments,
2216  * having some static variables is not a problem for multithreading.
2217  * 
2218  * Except for "data" and "size", all the other parameters (which are all function
2219  * pointers) MUST be the same in every processor. Having different processors
2220  * pass in different function pointers results in an undefined behaviour.
2221  * 
2222  * The data passed in to CmiReduce and CmiNodeReduce is deleted by the system,
2223  * and MUST be allocated with CmiAlloc. The data passed in to the "Struct"
2224  * functions is deleted with the provided function, or it is left intact if no
2225  * function is specified.
2226  * 
2227  * The destination handler for the the first form MUST be embedded into the
2228  * message's header.
2229  * 
2230  * The pup function is used to pup the input data structure into a message to
2231  * be sent to the parent processor. This pup routine is currently used only
2232  * for sizing and packing, NOT unpacking. It MUST be non-null.
2233  * 
2234  * The merge function receives as first parameter the input "data", being it
2235  * a message or a complex data structure (it is up to the user to interpret it
2236  * correctly), and a list of incoming (packed) messages from the children.
2237  * The merge function is responsible to delete "data" if this is no longer needed.
2238  * The system will be in charge of deleting the messages passed in as the second
2239  * argument, and the return value of the function (using the provided deleteFn in
2240  * the second version, or CmiFree in the first). The merge function can return
2241  * data if the merge can be performed in-place. It MUST be non-null.
2242  * 
2243  * At the destination, on processor zero, the final data returned by the last
2244  * merge call will not be deleted by the system, and the CmiHandler function
2245  * will be in charge of its deletion.
2246  * 
2247  * CmiReduce/CmiReduceStruct MUST be called once by every processor,
2248  * CmiNodeReduce/CmiNodeReduceStruct MUST be called once by every node, and in
2249  * particular by the rank zero in each node.
2250  ****************************************************************************/
2251
2252 CpvStaticDeclare(int, CmiReductionMessageHandler);
2253 CpvStaticDeclare(int, CmiReductionDynamicRequestHandler);
2254
2255 CpvStaticDeclare(CmiReduction**, _reduce_info);
2256 CpvStaticDeclare(int, _reduce_info_size); /* This is the log2 of the size of the array */
2257 CpvStaticDeclare(CmiUInt2, _reduce_seqID_global); /* This is used only by global reductions */
2258 CpvStaticDeclare(CmiUInt2, _reduce_seqID_request);
2259 CpvStaticDeclare(CmiUInt2, _reduce_seqID_dynamic);
2260
2261 enum {
2262   CmiReductionID_globalOffset = 0, /* Reductions that involve the whole set of processors */
2263   CmiReductionID_requestOffset = 1, /* Reductions IDs that are requested by all the processors (i.e during intialization) */
2264   CmiReductionID_dynamicOffset = 2, /* Reductions IDs that are requested by only one processor (typically at runtime) */
2265   CmiReductionID_multiplier = 3
2266 };
2267
2268 CmiReduction* CmiGetReductionCreate(int id, short int numChildren) {
2269   int index = id & ~((~0)<<CpvAccess(_reduce_info_size));
2270   CmiReduction *red = CpvAccess(_reduce_info)[index];
2271   if (red != NULL && red->seqID != id) {
2272     /* The table needs to be expanded */
2273     CmiAbort("Too many simultaneous reductions");
2274   }
2275   if (red == NULL || red->numChildren < numChildren) {
2276     CmiReduction *newred;
2277     if (red != NULL) CmiPrintf("[%d] Reduction structure reallocated\n",CmiMyPe());
2278     CmiAssert(red == NULL || red->localContributed == 0);
2279     if (numChildren == 0) numChildren = 4;
2280     newred = (CmiReduction*)malloc(sizeof(CmiReduction)+numChildren*sizeof(void*));
2281     newred->numRemoteReceived = 0;
2282     newred->localContributed = 0;
2283     newred->seqID = id;
2284     if (red != NULL) {
2285       memcpy(newred, red, sizeof(CmiReduction)+red->numChildren*sizeof(void*));
2286       free(red);
2287     }
2288     red = newred;
2289     red->numChildren = numChildren;
2290     red->remoteData = (char**)(red+1);
2291     CpvAccess(_reduce_info)[index] = red;
2292   }
2293   return red;
2294 }
2295
2296 CmiReduction* CmiGetReduction(int id) {
2297   return CmiGetReductionCreate(id, 0);
2298 }
2299
2300 void CmiClearReduction(int id) {
2301   int index = id & ~((~0)<<CpvAccess(_reduce_info_size));
2302   free(CpvAccess(_reduce_info)[index]);
2303   CpvAccess(_reduce_info)[index] = NULL;
2304 }
2305
2306 CmiReduction* CmiGetNextReduction(short int numChildren) {
2307   int id = CpvAccess(_reduce_seqID_global);
2308   CpvAccess(_reduce_seqID_global) += CmiReductionID_multiplier;
2309   if (id > 0xFFF0) CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
2310   return CmiGetReductionCreate(id, numChildren);
2311 }
2312
2313 CmiReductionID CmiGetGlobalReduction() {
2314   return CpvAccess(_reduce_seqID_request)+=CmiReductionID_multiplier;
2315 }
2316
2317 CmiReductionID CmiGetDynamicReduction() {
2318   if (CmiMyPe() != 0) CmiAbort("Cannot call CmiGetDynamicReduction on processors other than zero!\n");
2319   return CpvAccess(_reduce_seqID_dynamic)+=CmiReductionID_multiplier;
2320 }
2321
2322 void CmiReductionHandleDynamicRequest(char *msg) {
2323   int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
2324   int pe = values[0];
2325   int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+values[1];
2326   values[0] = CmiGetDynamicReduction();
2327   CmiSetHandler(msg, CmiGetXHandler(msg));
2328   if (pe >= 0) {
2329     CmiSyncSendAndFree(pe, size, msg);
2330   } else {
2331     CmiSyncBroadcastAllAndFree(size, msg);
2332   }
2333 }
2334
2335 void CmiGetDynamicReductionRemote(int handlerIdx, int pe, int dataSize, void *data) {
2336   int size = CmiMsgHeaderSizeBytes+2*sizeof(int)+dataSize;
2337   char *msg = (char*)CmiAlloc(size);
2338   int *values = (int*)(msg+CmiMsgHeaderSizeBytes);
2339   values[0] = pe;
2340   values[1] = dataSize;
2341   CmiSetXHandler(msg, handlerIdx);
2342   if (dataSize) memcpy(msg+CmiMsgHeaderSizeBytes+2*sizeof(int), data, dataSize);
2343   if (CmiMyPe() == 0) {
2344     CmiReductionHandleDynamicRequest(msg);
2345   } else {
2346     /* send the request to processor 0 */
2347     CmiSetHandler(msg, CpvAccess(CmiReductionDynamicRequestHandler));
2348     CmiSyncSendAndFree(0, size, msg);
2349   }
2350 }
2351
2352 void CmiSendReduce(CmiReduction *red) {
2353   void *mergedData, *msg;
2354   int msg_size;
2355   if (!red->localContributed || red->numChildren != red->numRemoteReceived) return;
2356   mergedData = red->localData;
2357   msg_size = red->localSize;
2358   if (red->numChildren > 0) {
2359     int i, offset=0;
2360     if (red->ops.pupFn != NULL) {
2361       offset = CmiReservedHeaderSize;
2362       for (i=0; i<red->numChildren; ++i) red->remoteData[i] += offset;
2363     }
2364     mergedData = (red->ops.mergeFn)(&msg_size, red->localData, (void **)red->remoteData, red->numChildren);
2365     for (i=0; i<red->numChildren; ++i) CmiFree(red->remoteData[i] - offset);
2366   }
2367   /*CpvAccess(_reduce_num_children) = 0;*/
2368   /*CpvAccess(_reduce_received) = 0;*/
2369   msg = mergedData;
2370   if (red->parent != -1) {
2371     if (red->ops.pupFn != NULL) {
2372       pup_er p = pup_new_sizer();
2373       (red->ops.pupFn)(p, mergedData);
2374       msg_size = pup_size(p) + CmiReservedHeaderSize;
2375       pup_destroy(p);
2376       msg = CmiAlloc(msg_size);
2377       p = pup_new_toMem((void*)(((char*)msg)+CmiReservedHeaderSize));
2378       (red->ops.pupFn)(p, mergedData);
2379       pup_destroy(p);
2380       if (red->ops.deleteFn != NULL) (red->ops.deleteFn)(red->localData);
2381     }
2382     CmiSetHandler(msg, CpvAccess(CmiReductionMessageHandler));
2383     CmiSetRedID(msg, red->seqID);
2384     /*CmiPrintf("CmiSendReduce(%d): sending %d bytes to %d\n",CmiMyPe(),msg_size,red->parent);*/
2385     CmiSyncSendAndFree(red->parent, msg_size, msg);
2386   } else {
2387     (red->ops.destination)(msg);
2388   }
2389   CmiClearReduction(red->seqID);
2390 }
2391
2392 void *CmiReduceMergeFn_random(int *size, void *data, void** remote, int n) {
2393   return data;
2394 }
2395
2396 static void CmiGlobalReduce(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReduction *red) {
2397   CmiAssert(red->localContributed == 0);
2398   red->localContributed = 1;
2399   red->localData = msg;
2400   red->localSize = size;
2401   red->numChildren = CmiNumSpanTreeChildren(CmiMyPe());
2402   red->parent = CmiSpanTreeParent(CmiMyPe());
2403   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
2404   red->ops.mergeFn = mergeFn;
2405   red->ops.pupFn = NULL;
2406   /*CmiPrintf("[%d] CmiReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2407   CmiSendReduce(red);
2408 }
2409
2410 static void CmiGlobalReduceStruct(void *data, CmiReducePupFn pupFn,
2411                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2412                      CmiReduceDeleteFn deleteFn, CmiReduction *red) {
2413   CmiAssert(red->localContributed == 0);
2414   red->localContributed = 1;
2415   red->localData = data;
2416   red->localSize = 0;
2417   red->numChildren = CmiNumSpanTreeChildren(CmiMyPe());
2418   red->parent = CmiSpanTreeParent(CmiMyPe());
2419   red->ops.destination = dest;
2420   red->ops.mergeFn = mergeFn;
2421   red->ops.pupFn = pupFn;
2422   red->ops.deleteFn = deleteFn;
2423   /*CmiPrintf("[%d] CmiReduceStruct::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2424   CmiSendReduce(red);
2425 }
2426
2427 void CmiReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2428   CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
2429   CmiGlobalReduce(msg, size, mergeFn, red);
2430 }
2431
2432 void CmiReduceStruct(void *data, CmiReducePupFn pupFn,
2433                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2434                      CmiReduceDeleteFn deleteFn) {
2435   CmiReduction *red = CmiGetNextReduction(CmiNumSpanTreeChildren(CmiMyPe()));
2436   CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
2437 }
2438
2439 void CmiReduceID(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2440   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2441   CmiGlobalReduce(msg, size, mergeFn, red);
2442 }
2443
2444 void CmiReduceStructID(void *data, CmiReducePupFn pupFn,
2445                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2446                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2447   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2448   CmiGlobalReduceStruct(data, pupFn, mergeFn, dest, deleteFn, red);
2449 }
2450
2451 void CmiListReduce(int npes, int *pes, void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2452   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2453   int myPos;
2454   CmiAssert(red->localContributed == 0);
2455   red->localContributed = 1;
2456   red->localData = msg;
2457   red->localSize = size;
2458   for (myPos=0; myPos<npes; ++myPos) {
2459     if (pes[myPos] == CmiMyPe()) break;
2460   }
2461   CmiAssert(myPos < npes);
2462   red->numChildren = npes - (myPos << 2) - 1;
2463   if (red->numChildren > 4) red->numChildren = 4;
2464   if (red->numChildren < 0) red->numChildren = 0;
2465   if (myPos == 0) red->parent = -1;
2466   else red->parent = pes[(myPos - 1) >> 2];
2467   red->ops.destination = (CmiHandler)CmiGetHandlerFunction(msg);
2468   red->ops.mergeFn = mergeFn;
2469   red->ops.pupFn = NULL;
2470   /*CmiPrintf("[%d] CmiListReduce::local %hd parent=%d, numChildren=%d\n",CmiMyPe(),red->seqID,red->parent,red->numChildren);*/
2471   CmiSendReduce(red);
2472 }
2473
2474 void CmiListReduceStruct(int npes, int *pes,
2475                      void *data, CmiReducePupFn pupFn,
2476                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2477                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2478   CmiReduction *red = CmiGetReductionCreate(id, CmiNumSpanTreeChildren(CmiMyPe()));
2479   int myPos;
2480   CmiAssert(red->localContributed == 0);
2481   red->localContributed = 1;
2482   red->localData = data;
2483   red->localSize = 0;
2484   for (myPos=0; myPos<npes; ++myPos) {
2485     if (pes[myPos] == CmiMyPe()) break;
2486   }
2487   CmiAssert(myPos < npes);
2488   red->numChildren = npes - (myPos << 2) - 1;
2489   if (red->numChildren > 4) red->numChildren = 4;
2490   if (red->numChildren < 0) red->numChildren = 0;
2491   red->parent = (myPos - 1) >> 2;
2492   if (myPos == 0) red->parent = -1;
2493   red->ops.destination = dest;
2494   red->ops.mergeFn = mergeFn;
2495   red->ops.pupFn = pupFn;
2496   red->ops.deleteFn = deleteFn;
2497   CmiSendReduce(red);
2498 }
2499
2500 void CmiGroupReduce(CmiGroup grp, void *msg, int size, CmiReduceMergeFn mergeFn, CmiReductionID id) {
2501   int npes, *pes;
2502   CmiLookupGroup(grp, &npes, &pes);
2503   CmiListReduce(npes, pes, msg, size, mergeFn, id);
2504 }
2505
2506 void CmiGroupReduceStruct(CmiGroup grp, void *data, CmiReducePupFn pupFn,
2507                      CmiReduceMergeFn mergeFn, CmiHandler dest,
2508                      CmiReduceDeleteFn deleteFn, CmiReductionID id) {
2509   int npes, *pes;
2510   CmiLookupGroup(grp, &npes, &pes);
2511   CmiListReduceStruct(npes, pes, data, pupFn, mergeFn, dest, deleteFn, id);
2512 }
2513
2514 void CmiNodeReduce(void *data, int size, CmiReduceMergeFn mergeFn, int redID, int numChildren, int parent) {
2515   CmiAbort("Feel free to implement CmiNodeReduce...");
2516   /*
2517   CmiAssert(CmiRankOf(CmiMyPe()) == 0);
2518   CpvAccess(_reduce_data) = data;
2519   CpvAccess(_reduce_data_size) = size;
2520   CpvAccess(_reduce_parent) = CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode()));
2521   _reduce_destination = (CmiHandler)CmiGetHandlerFunction(data);
2522   _reduce_pupFn = NULL;
2523   _reduce_mergeFn = mergeFn;
2524   CpvAccess(_reduce_num_children) = CmiNumNodeSpanTreeChildren(CmiMyNode());
2525   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(size);
2526   */
2527 }
2528 #if 0
2529 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int), int redID) {
2530   CmiNodeReduce(data, size, mergeFn, redID, CmiNumNodeSpanTreeChildren(CmiMyNode()),
2531       CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode())));
2532 }
2533 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int), int numChildren, int parent) {
2534   CmiNodeReduce(data, size, mergeFn, CmiReduceNextID(), numChildren, parent);
2535 }
2536 void CmiNodeReduce(void *data, int size, void * (*mergeFn)(void*,void**,int)) {
2537   CmiNodeReduce(data, size, mergeFn, CmiReduceNextID(), CmiNumNodeSpanTreeChildren(CmiMyNode()),
2538       CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode())));
2539 }
2540 #endif
2541
2542 void CmiNodeReduceStruct(void *data, CmiReducePupFn pupFn,
2543                          CmiReduceMergeFn mergeFn, CmiHandler dest,
2544                          CmiReduceDeleteFn deleteFn) {
2545   CmiAbort("Feel free to implement CmiNodeReduceStruct...");
2546 /*
2547   CmiAssert(CmiRankOf(CmiMyPe()) == 0);
2548   CpvAccess(_reduce_data) = data;
2549   CpvAccess(_reduce_parent) = CmiNodeFirst(CmiNodeSpanTreeParent(CmiMyNode()));
2550   _reduce_destination = dest;
2551   _reduce_pupFn = pupFn;
2552   _reduce_mergeFn = mergeFn;
2553   _reduce_deleteFn = deleteFn;
2554   CpvAccess(_reduce_num_children) = CmiNumNodeSpanTreeChildren(CmiMyNode());
2555   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce(0);
2556   */
2557 }
2558
2559 void CmiHandleReductionMessage(void *msg) {
2560   CmiReduction *red = CmiGetReduction(CmiGetRedID(msg));
2561   if (red->numRemoteReceived == red->numChildren) red = CmiGetReductionCreate(CmiGetRedID(msg), red->numChildren+4);
2562   red->remoteData[red->numRemoteReceived++] = msg;
2563   /*CmiPrintf("[%d] CmiReduce::remote %hd\n",CmiMyPe(),red->seqID);*/
2564   CmiSendReduce(red);
2565 /*
2566   CpvAccess(_reduce_msg_list)[CpvAccess(_reduce_received)++] = msg;
2567   if (CpvAccess(_reduce_received) == CpvAccess(_reduce_num_children)) CmiSendReduce();
2568   / *else CmiPrintf("CmiHandleReductionMessage(%d): %d - %d\n",CmiMyPe(),CpvAccess(_reduce_received),CpvAccess(_reduce_num_children));*/
2569 }
2570
2571 void CmiReductionsInit() {
2572   int i;
2573   CpvInitialize(int, CmiReductionMessageHandler);
2574   CpvAccess(CmiReductionMessageHandler) = CmiRegisterHandler((CmiHandler)CmiHandleReductionMessage);
2575   CpvInitialize(int, CmiReductionDynamicRequestHandler);
2576   CpvAccess(CmiReductionDynamicRequestHandler) = CmiRegisterHandler((CmiHandler)CmiReductionHandleDynamicRequest);
2577   CpvInitialize(CmiUInt2, _reduce_seqID_global);
2578   CpvAccess(_reduce_seqID_global) = CmiReductionID_globalOffset;
2579   CpvInitialize(CmiUInt2, _reduce_seqID_request);
2580   CpvAccess(_reduce_seqID_request) = CmiReductionID_requestOffset;
2581   CpvInitialize(CmiUInt2, _reduce_seqID_dynamic);
2582   CpvAccess(_reduce_seqID_dynamic) = CmiReductionID_dynamicOffset;
2583   CpvInitialize(int, _reduce_info_size);
2584   CpvAccess(_reduce_info_size) = 4;
2585   CpvInitialize(CmiReduction**, _reduce_info);
2586   CpvAccess(_reduce_info) = malloc(16*sizeof(CmiReduction*));
2587   for (i=0; i<16; ++i) CpvAccess(_reduce_info)[i] = NULL;
2588 }
2589
2590 /*****************************************************************************
2591  *
2592  * Multicast groups
2593  *
2594  ****************************************************************************/
2595
2596 #if CMK_MULTICAST_DEF_USE_COMMON_CODE
2597
2598 typedef struct GroupDef
2599 {
2600   union {
2601     char core[CmiMsgHeaderSizeBytes];
2602     struct GroupDef *next;
2603   } core;
2604   CmiGroup group;
2605   int npes;
2606   int pes[1];
2607 }
2608 *GroupDef;
2609
2610 #define GROUPTAB_SIZE 101
2611
2612 CpvStaticDeclare(int, CmiGroupHandlerIndex);
2613 CpvStaticDeclare(int, CmiGroupCounter);
2614 CpvStaticDeclare(GroupDef *, CmiGroupTable);
2615
2616 void CmiGroupHandler(GroupDef def)
2617 {
2618   /* receive group definition, insert into group table */
2619   GroupDef *table = CpvAccess(CmiGroupTable);
2620   unsigned int hashval, bucket;
2621   hashval = (def->group.id ^ def->group.pe);
2622   bucket = hashval % GROUPTAB_SIZE;
2623   def->core.next = table[bucket];
2624   table[bucket] = def;
2625 }
2626
2627 CmiGroup CmiEstablishGroup(int npes, int *pes)
2628 {
2629   /* build new group definition, broadcast it */
2630   CmiGroup grp; GroupDef def; int len, i;
2631   grp.id = CpvAccess(CmiGroupCounter)++;
2632   grp.pe = CmiMyPe();
2633   len = sizeof(struct GroupDef)+(npes*sizeof(int));
2634   def = (GroupDef)CmiAlloc(len);
2635   def->group = grp;
2636   def->npes = npes;
2637   for (i=0; i<npes; i++)
2638     def->pes[i] = pes[i];
2639   CmiSetHandler(def, CpvAccess(CmiGroupHandlerIndex));
2640   CmiSyncBroadcastAllAndFree(len, def);
2641   return grp;
2642 }
2643
2644 void CmiLookupGroup(CmiGroup grp, int *npes, int **pes)
2645 {
2646   unsigned int hashval, bucket;  GroupDef def;
2647   GroupDef *table = CpvAccess(CmiGroupTable);
2648   hashval = (grp.id ^ grp.pe);
2649   bucket = hashval % GROUPTAB_SIZE;
2650   for (def=table[bucket]; def; def=def->core.next) {
2651     if ((def->group.id == grp.id)&&(def->group.pe == grp.pe)) {
2652       *npes = def->npes;
2653       *pes = def->pes;
2654       return;
2655     }
2656   }
2657   *npes = 0; *pes = 0;
2658 }
2659
2660 void CmiGroupInit()
2661 {
2662   CpvInitialize(int, CmiGroupHandlerIndex);
2663   CpvInitialize(int, CmiGroupCounter);
2664   CpvInitialize(GroupDef *, CmiGroupTable);
2665   CpvAccess(CmiGroupHandlerIndex) = CmiRegisterHandler((CmiHandler)CmiGroupHandler);
2666   CpvAccess(CmiGroupCounter) = 0;
2667   CpvAccess(CmiGroupTable) =
2668     (GroupDef*)calloc(GROUPTAB_SIZE, sizeof(GroupDef));
2669   if (CpvAccess(CmiGroupTable) == 0)
2670     CmiAbort("Memory Allocation Error");
2671 }
2672
2673 #endif
2674
2675 /*****************************************************************************
2676  *
2677  * Common List-Cast and Multicast Code
2678  *
2679  ****************************************************************************/
2680
2681 #if CMK_MULTICAST_LIST_USE_COMMON_CODE
2682
2683 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2684 {
2685   int i;
2686 #if CMK_BROADCAST_USE_CMIREFERENCE
2687   for(i=0;i<npes;i++) {
2688     if (pes[i] == CmiMyPe())
2689       CmiSyncSend(pes[i], len, msg);
2690     else {
2691       CmiReference(msg);
2692       CmiSyncSendAndFree(pes[i], len, msg);
2693     }
2694   }
2695 #else
2696   for(i=0;i<npes;i++) {
2697     CmiSyncSend(pes[i], len, msg);
2698   }
2699 #endif
2700 }
2701
2702 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2703 {
2704   /* A better asynchronous implementation may be wanted, but at least it works */
2705   CmiSyncListSendFn(npes, pes, len, msg);
2706   return (CmiCommHandle) 0;
2707 }
2708
2709 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2710 {
2711 #if CMK_BROADCAST_USE_CMIREFERENCE
2712   if (npes == 1) {
2713     CmiSyncSendAndFree(pes[0], len, msg);
2714     return;
2715   }
2716   CmiSyncListSendFn(npes, pes, len, msg);
2717   CmiFree(msg);
2718 #else
2719   int i;
2720   for(i=0;i<npes-1;i++) {
2721     CmiSyncSend(pes[i], len, msg);
2722   }
2723   if (npes>0)
2724     CmiSyncSendAndFree(pes[npes-1], len, msg);
2725   else 
2726     CmiFree(msg);
2727 #endif
2728 }
2729
2730 #endif
2731
2732 #if CMK_MULTICAST_GROUP_USE_COMMON_CODE
2733
2734 typedef struct MultiMsg
2735 {
2736   char core[CmiMsgHeaderSizeBytes];
2737   CmiGroup group;
2738   int pos;
2739   int origlen;
2740 }
2741 *MultiMsg;
2742
2743
2744 CpvDeclare(int, CmiMulticastHandlerIndex);
2745
2746 void CmiMulticastDeliver(MultiMsg msg)
2747 {
2748   int npes, *pes; int olen, nlen, pos, child1, child2;
2749   olen = msg->origlen;
2750   nlen = olen + sizeof(struct MultiMsg);
2751   CmiLookupGroup(msg->group, &npes, &pes);
2752   if (pes==0) {
2753     CmiSyncSendAndFree(CmiMyPe(), nlen, msg);
2754     return;
2755   }
2756   if (npes==0) {
2757     CmiFree(msg);
2758     return;
2759   }
2760   if (msg->pos == -1) {
2761     msg->pos=0;
2762     CmiSyncSendAndFree(pes[0], nlen, msg);
2763     return;
2764   }
2765   pos = msg->pos;
2766   child1 = ((pos+1)<<1);
2767   child2 = child1-1;
2768   if (child1 < npes) {
2769     msg->pos = child1;
2770     CmiSyncSend(pes[child1], nlen, msg);
2771   }
2772   if (child2 < npes) {
2773     msg->pos = child2;
2774     CmiSyncSend(pes[child2], nlen, msg);
2775   }
2776   if(olen < sizeof(struct MultiMsg)) {
2777     memcpy(msg, msg+1, olen);
2778   } else {
2779     memcpy(msg, (((char*)msg)+olen), sizeof(struct MultiMsg));
2780   }
2781   CmiSyncSendAndFree(CmiMyPe(), olen, msg);
2782 }
2783
2784 void CmiMulticastHandler(MultiMsg msg)
2785 {
2786   CmiMulticastDeliver(msg);
2787 }
2788
2789 void CmiSyncMulticastFn(CmiGroup grp, int len, char *msg)
2790 {
2791   int newlen; MultiMsg newmsg;
2792   newlen = len + sizeof(struct MultiMsg);
2793   newmsg = (MultiMsg)CmiAlloc(newlen);
2794   if(len < sizeof(struct MultiMsg)) {
2795     memcpy(newmsg+1, msg, len);
2796   } else {
2797     memcpy(newmsg+1, msg+sizeof(struct MultiMsg), len-sizeof(struct MultiMsg));
2798     memcpy(((char *)newmsg+len), msg, sizeof(struct MultiMsg));
2799   }
2800   newmsg->group = grp;
2801   newmsg->origlen = len;
2802   newmsg->pos = -1;
2803   CmiSetHandler(newmsg, CpvAccess(CmiMulticastHandlerIndex));
2804   CmiMulticastDeliver(newmsg);
2805 }
2806
2807 void CmiFreeMulticastFn(CmiGroup grp, int len, char *msg)
2808 {
2809   CmiSyncMulticastFn(grp, len, msg);
2810   CmiFree(msg);
2811 }
2812
2813 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int len, char *msg)
2814 {
2815   CmiError("Async Multicast not implemented.");
2816   return (CmiCommHandle) 0;
2817 }
2818
2819 void CmiMulticastInit()
2820 {
2821   CpvInitialize(int, CmiMulticastHandlerIndex);
2822   CpvAccess(CmiMulticastHandlerIndex) =
2823     CmiRegisterHandler((CmiHandler)CmiMulticastHandler);
2824 }
2825
2826 #endif
2827
2828 #if CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2829 extern void *arena_malloc(int size);
2830 extern void arena_free(void *blockPtr);
2831 #endif
2832
2833 /***************************************************************************
2834  *
2835  * Memory Allocation routines 
2836  *
2837  * A block of memory can consist of multiple chunks.  Each chunk has
2838  * a sizefield and a refcount.  The first chunk's refcount is a reference
2839  * count.  That's how many CmiFrees it takes to free the message.
2840  * Subsequent chunks have a refcount which is less than zero.  This is
2841  * the offset back to the start of the first chunk.
2842  *
2843  * Each chunk has a CmiChunkHeader before the user data, with the fields:
2844  *
2845  *  size: The user-allocated size of the chunk, in bytes.
2846  *
2847  *  ref: A magic reference count object. Ordinary blocks start with
2848  *     reference count 1.  When the reference count reaches zero,
2849  *     the block is deleted.  To support nested buffers, the 
2850  *     reference count can also be negative, which means it is 
2851  *     a byte offset to the enclosing buffer's reference count.
2852  *
2853  ***************************************************************************/
2854
2855
2856 void *CmiAlloc(int size)
2857 {
2858
2859   char *res;
2860
2861 #if CONVERSE_VERSION_ELAN
2862   res = (char *) elan_CmiAlloc(size+sizeof(CmiChunkHeader));
2863 #elif CONVERSE_VERSION_VMI
2864   res = (char *) CMI_VMI_CmiAlloc(size+sizeof(CmiChunkHeader));
2865 #elif CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2866   res = (char*) arena_malloc(size+sizeof(CmiChunkHeader));
2867 #elif CMK_USE_IBVERBS | CMK_USE_IBUD
2868   res = (char *) infi_CmiAlloc(size+sizeof(CmiChunkHeader));
2869 #elif CMK_CONVERSE_GEMINI_UGNI
2870   res =(char *) LrtsAlloc(size, sizeof(CmiChunkHeader));
2871 #elif CONVERSE_POOL
2872   res =(char *) CmiPoolAlloc(size+sizeof(CmiChunkHeader));
2873 #elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
2874   MPI_Alloc_mem(size+sizeof(CmiChunkHeader), MPI_INFO_NULL, &res);
2875 #elif CMK_SMP && CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
2876   res = (char *) CmiAlloc_bgq(size+sizeof(CmiChunkHeader));
2877 #else
2878   res =(char *) malloc_nomigrate(size+sizeof(CmiChunkHeader));
2879 #endif
2880
2881   _MEMCHECK(res);
2882
2883 #ifdef MEMMONITOR
2884   CpvAccess(MemoryUsage) += size+sizeof(CmiChunkHeader);
2885   CpvAccess(AllocCount)++;
2886   CpvAccess(BlocksAllocated)++;
2887   if (CpvAccess(MemoryUsage) > CpvAccess(HiWaterMark)) {
2888     CpvAccess(HiWaterMark) = CpvAccess(MemoryUsage);
2889   }
2890   if (CpvAccess(MemoryUsage) > 1.1 * CpvAccess(ReportedHiWaterMark)) {
2891     CmiPrintf("HIMEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2892             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2893             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2894     CpvAccess(ReportedHiWaterMark) = CpvAccess(MemoryUsage);
2895   }
2896   if ((CpvAccess(AllocCount) % 1000) == 0) {
2897     CmiPrintf("MEM STAT PE%d: %d Allocs, %d blocks, %lu K, Max %lu K\n",
2898             CmiMyPe(), CpvAccess(AllocCount), CpvAccess(BlocksAllocated),
2899             CpvAccess(MemoryUsage)/1024, CpvAccess(HiWaterMark)/1024);
2900   }
2901 #endif
2902
2903   res+=sizeof(CmiChunkHeader);
2904   SIZEFIELD(res)=size;
2905   REFFIELD(res)=1;
2906   return (void *)res;
2907 }
2908
2909 /** Follow the header links out to the most enclosing block */
2910 static void *CmiAllocFindEnclosing(void *blk) {
2911   int refCount = REFFIELD(blk);
2912   while (refCount < 0) {
2913     blk = (void *)((char*)blk+refCount); /* Jump to enclosing block */
2914     refCount = REFFIELD(blk);
2915   }
2916   return blk;
2917 }
2918
2919 int CmiGetReference(void *blk)
2920 {
2921   return REFFIELD(CmiAllocFindEnclosing(blk));
2922 }
2923
2924 /** Increment the reference count for this block's owner.
2925     This call must be matched by an equivalent CmiFree. */
2926 void CmiReference(void *blk)
2927 {
2928   REFFIELD(CmiAllocFindEnclosing(blk))++;
2929 }
2930
2931 /** Return the size of the user portion of this block. */
2932 int CmiSize(void *blk)
2933 {
2934   return SIZEFIELD(blk);
2935 }
2936
2937 /** Decrement the reference count for this block. */
2938 void CmiFree(void *blk)
2939 {
2940   void *parentBlk=CmiAllocFindEnclosing(blk);
2941   int refCount=REFFIELD(parentBlk);
2942 #if CMK_ERROR_CHECKING
2943   if(refCount==0) /* Logic error: reference count shouldn't already have been zero */
2944     CmiAbort("CmiFree reference count was zero-- is this a duplicate free?");
2945 #endif
2946   refCount--;
2947   REFFIELD(parentBlk) = refCount;
2948   if(refCount==0) { /* This was the last reference to the block-- free it */
2949 #ifdef MEMMONITOR
2950     int size=SIZEFIELD(parentBlk);
2951     if (size > 1000000000) /* Absurdly large size field-- warning */
2952       CmiPrintf("MEMSTAT Uh-oh -- SIZEFIELD=%d\n",size);
2953     CpvAccess(MemoryUsage) -= (size + sizeof(CmiChunkHeader));
2954     CpvAccess(BlocksAllocated)--;
2955 #endif
2956
2957 #if CONVERSE_VERSION_ELAN
2958     elan_CmiFree(BLKSTART(parentBlk));
2959 #elif CONVERSE_VERSION_VMI
2960     CMI_VMI_CmiFree(BLKSTART(parentBlk));
2961 #elif CONVERSE_VERSION_SHMEM && CMK_ARENA_MALLOC
2962     arena_free(BLKSTART(parentBlk));
2963 #elif CMK_USE_IBVERBS | CMK_USE_IBUD
2964     /* is this message the head of a MultipleSend that we received?
2965        Then the parts with INFIMULTIPOOL have metadata which must be 
2966        unregistered and freed.  */
2967 #ifdef CMK_IBVERS_CLEAN_MULTIPLESEND
2968     if(CmiGetHandler(parentBlk)==CpvAccess(CmiMainHandlerIDP))
2969       {
2970         infi_freeMultipleSend(parentBlk);
2971       }
2972 #endif
2973     infi_CmiFree(BLKSTART(parentBlk));
2974 #elif CMK_CONVERSE_GEMINI_UGNI
2975     LrtsFree(BLKSTART(parentBlk));
2976 #elif CONVERSE_POOL
2977     CmiPoolFree(BLKSTART(parentBlk));
2978 #elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
2979     MPI_Free_mem(parentBlk);
2980 #elif CMK_SMP && CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
2981     CmiFree_bgq(BLKSTART(parentBlk));
2982 #else
2983     free_nomigrate(BLKSTART(parentBlk));
2984 #endif
2985   }
2986 }
2987
2988
2989 /***************************************************************************
2990  *
2991  * Temporary-memory Allocation routines 
2992  *
2993  *  This buffer augments the storage available on the regular machine stack
2994  * for fairly large temporary buffers, which allows us to use smaller machine
2995  * stacks.
2996  *
2997  ***************************************************************************/
2998
2999 #define CMI_TMP_BUF_MAX 128*1024 /* Allow this much temporary storage. */
3000
3001 typedef struct {
3002   char *buf; /* Start of temporary buffer */
3003   int cur; /* First unused location in temporary buffer */
3004   int max; /* Length of temporary buffer */
3005 } CmiTmpBuf_t;
3006 CpvDeclare(CmiTmpBuf_t,CmiTmpBuf); /* One temporary buffer per PE */
3007
3008 static void CmiTmpSetup(CmiTmpBuf_t *b) {
3009   b->buf=malloc(CMI_TMP_BUF_MAX);
3010   b->cur=0;
3011   b->max=CMI_TMP_BUF_MAX;
3012 }
3013
3014 void *CmiTmpAlloc(int size) {
3015   if (!CpvInitialized(CmiTmpBuf)) {
3016     return malloc(size);
3017   }
3018   else { /* regular case */
3019     CmiTmpBuf_t *b=&CpvAccess(CmiTmpBuf);
3020     void *t;
3021     if (b->cur+size>b->max) {
3022       if (b->max==0) /* We're just uninitialized */
3023         CmiTmpSetup(b);
3024       else /* We're really out of space! */
3025         CmiAbort("CmiTmpAlloc: asked for too much temporary buffer space");
3026     }
3027     t=b->buf+b->cur;
3028     b->cur+=size;
3029     return t;
3030   }
3031 }
3032 void CmiTmpFree(void *t) {
3033   if (!CpvInitialized(CmiTmpBuf)) {
3034     free(t);
3035   }
3036   else { /* regular case */
3037     CmiTmpBuf_t *b=&CpvAccess(CmiTmpBuf);
3038     /* t should point into our temporary buffer: figure out where */
3039     int cur=((const char *)t)-b->buf;
3040 #if CMK_ERROR_CHECKING
3041     if (cur<0 || cur>b->max)
3042       CmiAbort("CmiTmpFree: called with an invalid pointer");
3043 #endif
3044     b->cur=cur;
3045   }
3046 }
3047
3048 void CmiTmpInit(char **argv) {
3049   CpvInitialize(CmiTmpBuf_t,CmiTmpBuf);
3050   /* Set up this processor's temporary buffer */
3051   CmiTmpSetup(&CpvAccess(CmiTmpBuf));
3052 }
3053
3054 /******************************************************************************
3055
3056   Cross-platform directory creation
3057
3058   ****************************************************************************/
3059 #ifdef _MSC_VER
3060 /* Windows directory creation: */
3061 #include <windows.h>
3062
3063 void CmiMkdir(const char *dirName) {
3064         CreateDirectory(dirName,NULL);
3065 }
3066
3067 #else /* !_MSC_VER */
3068 /* UNIX directory creation */
3069 #include <unistd.h> 
3070 #include <sys/stat.h> /* from "mkdir" man page */
3071 #include <sys/types.h>
3072
3073 void CmiMkdir(const char *dirName) {
3074 #ifndef __MINGW_H
3075         mkdir(dirName,0777);
3076 #else
3077         mkdir(dirName);
3078 #endif
3079 }
3080
3081 #endif
3082
3083
3084 /******************************************************************************
3085
3086   Multiple Send function                               
3087
3088   ****************************************************************************/
3089
3090
3091
3092
3093
3094 /****************************************************************************
3095 * DESCRIPTION : This function call allows the user to send multiple messages
3096 *               from one processor to another, all intended for differnet 
3097 *               handlers.
3098 *
3099 *               Parameters :
3100 *
3101 *               destPE, len, int sizes[0..len-1], char *messages[0..len-1]
3102 *
3103 ****************************************************************************/
3104 /* Round up message size to the message granularity. 
3105    Does this by adding, then truncating.
3106 */
3107 static int roundUpSize(unsigned int s) {
3108   return (int)((s+sizeof(double)-1)&~(sizeof(double)-1));
3109 }
3110 /* Return the amount of message padding required for a message
3111    with this many user bytes. 
3112  */
3113 static int paddingSize(unsigned int s) {
3114   return roundUpSize(s)-s;
3115 }
3116
3117 /* Message header for a bundle of multiple-sent messages */
3118 typedef struct {
3119   char convHeader[CmiMsgHeaderSizeBytes];
3120   int nMessages; /* Number of distinct messages bundled below. */
3121   double pad; /* To align the first message, which follows this header */
3122 } CmiMultipleSendHeader;
3123
3124 #if CMK_USE_IBVERBS | CMK_USE_IBUD
3125 /* given a pointer to a multisend message clean up the metadata */
3126
3127 void infi_freeMultipleSend(void *msgWhole)
3128 {
3129   int len=((CmiMultipleSendHeader *)msgWhole)->nMessages;
3130   double pad=((CmiMultipleSendHeader *)msgWhole)->pad;
3131   int offset=sizeof(CmiMultipleSendHeader);
3132   int m;
3133   void *thisMsg=NULL;
3134   if (pad != 1234567.89) return;
3135   for(m=0;m<len;m++)
3136     {
3137       /*unreg meta, free meta, move the ptr */
3138       /* note these weird little things are not pooled */
3139       /* do NOT free the message here, we are only a part of this buffer*/
3140       infiCmiChunkHeader *ch=(infiCmiChunkHeader *)(msgWhole+offset);
3141       char *msg=(msgWhole+offset+sizeof(infiCmiChunkHeader));
3142       int msgSize=ch->chunkHeader.size; /* Size of user portion of message (plus padding at end) */
3143       infi_unregAndFreeMeta(ch->metaData);
3144       offset+= sizeof(infiCmiChunkHeader) + msgSize;
3145     }
3146 }
3147 #endif
3148
3149
3150 static void _CmiMultipleSend(unsigned int destPE, int len, int sizes[], char *msgComps[], int immed)
3151 {
3152   CmiMultipleSendHeader header;
3153   int m; /* Outgoing message */
3154
3155 #if CMK_USE_IBVERBS
3156   infiCmiChunkHeader *msgHdr;
3157 #else
3158   CmiChunkHeader *msgHdr; /* Chunk headers for each message */
3159 #endif
3160         
3161   double pad = 0; /* padding required */
3162   int vecLen; /* Number of pieces in outgoing message vector */
3163   int *vecSizes; /* Sizes of each piece we're sending out. */
3164   char **vecPtrs; /* Pointers to each piece we're sending out. */
3165   int vec; /* Entry we're currently filling out in above array */
3166         
3167 #if CMK_USE_IBVERBS
3168   msgHdr = (infiCmiChunkHeader *)CmiTmpAlloc(len * sizeof(infiCmiChunkHeader));
3169 #else
3170   msgHdr = (CmiChunkHeader *)CmiTmpAlloc(len * sizeof(CmiChunkHeader));
3171 #endif
3172         
3173   /* Allocate memory for the outgoing vector*/
3174   vecLen=1+3*len; /* Header and 3 parts per message */
3175   vecSizes = (int *)CmiTmpAlloc(vecLen * sizeof(int));
3176   vecPtrs = (char **)CmiTmpAlloc(vecLen * sizeof(char *));
3177   vec=0;
3178   
3179   /* Build the header */
3180   header.nMessages=len;
3181   CmiSetHandler(&header, CpvAccess(CmiMainHandlerIDP));
3182   header.pad = 1234567.89;
3183 #if CMK_IMMEDIATE_MSG
3184   if (immed) CmiBecomeImmediate(&header);
3185 #endif
3186   vecSizes[vec]=sizeof(header); vecPtrs[vec]=(char *)&header;
3187   vec++;
3188
3189   /* Build an entry for each message: 
3190          | CmiChunkHeader | Message data | Message padding | ...next message entry ...
3191   */
3192   for (m=0;m<len;m++) {
3193 #if CMK_USE_IBVERBS
3194     msgHdr[m].chunkHeader.size=roundUpSize(sizes[m]); /* Size of message and padding */
3195     msgHdr[m].chunkHeader.ref=0; /* Reference count will be filled out on receive side */
3196     msgHdr[m].metaData=NULL;
3197 #else
3198     msgHdr[m].size=roundUpSize(sizes[m]); /* Size of message and padding */
3199     msgHdr[m].ref=0; /* Reference count will be filled out on receive side */
3200 #endif          
3201     
3202     /* First send the message's CmiChunkHeader (for use on receive side) */
3203 #if CMK_USE_IBVERBS
3204     vecSizes[vec]=sizeof(infiCmiChunkHeader);
3205 #else
3206     vecSizes[vec]=sizeof(CmiChunkHeader); 
3207 #endif          
3208                 vecPtrs[vec]=(char *)&msgHdr[m];
3209     vec++;
3210     
3211     /* Now send the actual message data */
3212     vecSizes[vec]=sizes[m]; vecPtrs[vec]=msgComps[m];
3213     vec++;
3214     
3215     /* Now send padding to align the next message on a double-boundary */
3216     vecSizes[vec]=paddingSize(sizes[m]); vecPtrs[vec]=(char *)&pad;
3217     vec++;
3218   }
3219   CmiAssert(vec==vecLen);
3220   
3221   CmiSyncVectorSend(destPE, vecLen, vecSizes, vecPtrs);
3222   
3223   CmiTmpFree(vecPtrs); /* CmiTmp: Be sure to throw away in opposite order of allocation */
3224   CmiTmpFree(vecSizes);
3225   CmiTmpFree(msgHdr);
3226 }
3227
3228 void CmiMultipleSend(unsigned int destPE, int len, int sizes[], char *msgComps[])
3229 {
3230   _CmiMultipleSend(destPE, len, sizes, msgComps, 0);
3231 }
3232
3233 void CmiMultipleIsend(unsigned int destPE, int len, int sizes[], char *msgComps[])
3234 {
3235   _CmiMultipleSend(destPE, len, sizes, msgComps, 1);
3236 }
3237
3238 /****************************************************************************
3239 * DESCRIPTION : This function initializes the main handler required for the
3240 *               CmiMultipleSend() function to work. 
3241 *               
3242 *               This function should be called once in any Converse program
3243 *               that uses CmiMultipleSend()
3244 *
3245 ****************************************************************************/
3246
3247 static void CmiMultiMsgHandler(char *msgWhole);
3248
3249 void CmiInitMultipleSend(void)
3250 {
3251   CpvInitialize(int,CmiMainHandlerIDP); 
3252   CpvAccess(CmiMainHandlerIDP) =
3253     CmiRegisterHandler((CmiHandler)CmiMultiMsgHandler);
3254 }
3255
3256 /****************************************************************************
3257 * DESCRIPTION : This function is the main handler that splits up the messages
3258 *               CmiMultipleSend() pastes together. 
3259 *
3260 ****************************************************************************/
3261
3262 static void CmiMultiMsgHandler(char *msgWhole)
3263 {
3264   int len=((CmiMultipleSendHeader *)msgWhole)->nMessages;
3265   int offset=sizeof(CmiMultipleSendHeader);
3266   int m;
3267   for (m=0;m<len;m++) {
3268 #if CMK_USE_IBVERBS
3269     infiCmiChunkHeader *ch=(infiCmiChunkHeader *)(msgWhole+offset);
3270     char *msg=(msgWhole+offset+sizeof(infiCmiChunkHeader));
3271     int msgSize=ch->chunkHeader.size; /* Size of user portion of message (plus padding at end) */
3272     ch->chunkHeader.ref=msgWhole-msg; 
3273     ch->metaData =  registerMultiSendMesg(msg,msgSize);
3274 #else
3275     CmiChunkHeader *ch=(CmiChunkHeader *)(msgWhole+offset);
3276     char *msg=(msgWhole+offset+sizeof(CmiChunkHeader));
3277     int msgSize=ch->size; /* Size of user portion of message (plus padding at end) */
3278     ch->ref=msgWhole-msg; 
3279 #endif          
3280     /* Link new message to owner via a negative ref pointer */
3281     CmiReference(msg); /* Follows link & increases reference count of *msgWhole* */
3282     CmiSyncSendAndFree(CmiMyPe(), msgSize, msg);
3283 #if CMK_USE_IBVERBS
3284     offset+= sizeof(infiCmiChunkHeader) + msgSize;
3285 #else
3286     offset+= sizeof(CmiChunkHeader) + msgSize;
3287 #endif          
3288   }
3289   /* Release our reference to the whole message.  The message will
3290      only actually be deleted once all its sub-messages are free'd as well. */
3291   CmiFree(msgWhole);
3292 }
3293
3294 /****************************************************************************
3295 * Hypercube broadcast message passing.
3296 ****************************************************************************/
3297
3298 int HypercubeGetBcastDestinations(int mype, int total_pes, int k, int *dest_pes) {
3299   int num_pes = 0;
3300   for ( ; k>=0; --k) {
3301     /* add the processor destination at level k if it exist */
3302     dest_pes[num_pes] = mype ^ (1<<k);
3303     if (dest_pes[num_pes] >= total_pes) {
3304       /* find the first proc in the other part of the current dimention */
3305       dest_pes[num_pes] &= (-1)<<k;
3306       /* if the first proc there is over CmiNumPes() then there is no other
3307          dimension, otherwise if it is valid compute my correspondent in such
3308          a way to minimize the load for every processor */
3309       if (total_pes>dest_pes[num_pes]) dest_pes[num_pes] += (mype - (mype & ((-1)<<k))) % (total_pes - dest_pes[num_pes]);
3310       }
3311     if (dest_pes[num_pes] < total_pes) {
3312       /* if the destination is in the acceptable range increment num_pes */
3313       ++num_pes;
3314     }
3315   }
3316   return num_pes;
3317 }
3318
3319
3320 /****************************************************************************
3321 * DESCRIPTION : This function initializes the main handler required for the
3322 *               Immediate message
3323 *               
3324 *               This function should be called once in any Converse program
3325 *
3326 ****************************************************************************/
3327
3328 int _immediateLock = 0; /* if locked, all immediate message handling will be delayed. */
3329 int _immediateFlag = 0; /* if set, there is delayed immediate message. */
3330
3331 CpvDeclare(int, CmiImmediateMsgHandlerIdx); /* Main handler that is run on every node */
3332
3333 /* xdl is the real handler */
3334 static void CmiImmediateMsgHandler(char *msg)
3335 {
3336   CmiSetHandler(msg, CmiGetXHandler(msg));
3337   CmiHandleMessage(msg);
3338 }
3339
3340 void CmiInitImmediateMsg(void)
3341 {
3342   CpvInitialize(int,CmiImmediateMsgHandlerIdx); 
3343   CpvAccess(CmiImmediateMsgHandlerIdx) =
3344     CmiRegisterHandler((CmiHandler)CmiImmediateMsgHandler);
3345 }
3346
3347 /*#if !CMK_IMMEDIATE_MSG
3348 #if !CMK_MACHINE_PROGRESS_DEFINED
3349 void CmiProbeImmediateMsg()
3350 {
3351 }
3352 #endif
3353 #endif*/
3354
3355 /******** Idle timeout module (+idletimeout=30) *********/
3356
3357 typedef struct {
3358   int idle_timeout;/*Milliseconds to wait idle before aborting*/
3359   int is_idle;/*Boolean currently-idle flag*/
3360   int call_count;/*Number of timeout calls currently in flight*/
3361 } cmi_cpu_idlerec;
3362
3363 static void on_timeout(cmi_cpu_idlerec *rec,double curWallTime)
3364 {
3365   rec->call_count--;
3366   if(rec->call_count==0 && rec->is_idle==1) {
3367     CmiError("Idle time on PE %d exceeded specified timeout.\n", CmiMyPe());
3368     CmiAbort("Exiting.\n");
3369   }
3370 }
3371 static void on_idle(cmi_cpu_idlerec *rec,double curWallTime)
3372 {
3373   CcdCallFnAfter((CcdVoidFn)on_timeout, rec, rec->idle_timeout);
3374   rec->call_count++; /*Keeps track of overlapping timeout calls.*/  
3375   rec->is_idle = 1;
3376 }
3377 static void on_busy(cmi_cpu_idlerec *rec,double curWallTime)
3378 {
3379   rec->is_idle = 0;
3380 }
3381 static void CIdleTimeoutInit(char **argv)
3382 {
3383   int idle_timeout=0; /*Seconds to wait*/
3384   CmiGetArgIntDesc(argv,"+idle-timeout",&idle_timeout,"Abort if idle for this many seconds");
3385   if(idle_timeout != 0) {
3386     cmi_cpu_idlerec *rec=(cmi_cpu_idlerec *)malloc(sizeof(cmi_cpu_idlerec));
3387     _MEMCHECK(rec);
3388     rec->idle_timeout=idle_timeout*1000;
3389     rec->is_idle=0;
3390     rec->call_count=0;
3391     CcdCallOnCondition(CcdPROCESSOR_BEGIN_IDLE, (CcdVoidFn)on_idle, rec);
3392     CcdCallOnCondition(CcdPROCESSOR_BEGIN_BUSY, (CcdVoidFn)on_busy, rec);
3393   }
3394 }
3395
3396 /*****************************************************************************
3397  *
3398  * Converse Initialization
3399  *
3400  *****************************************************************************/
3401
3402 extern void CrnInit(void);
3403 extern void CmiIsomallocInit(char **argv);
3404 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3405 void CmiIOInit(char **argv);
3406 #endif
3407
3408 /* defined in cpuaffinity.c */
3409 extern void CmiInitCPUAffinityUtil();
3410
3411 static void CmiProcessPriority(char **argv)
3412 {
3413   int dummy, nicelevel=-100;      /* process priority */
3414   CmiGetArgIntDesc(argv,"+nice",&nicelevel,"Set the process priority level");
3415   /* ignore others */
3416   while (CmiGetArgIntDesc(argv,"+nice",&dummy,"Set the process priority level"));
3417   /* call setpriority once on each process to set process's priority */
3418   if (CmiMyRank() == 0 && nicelevel != -100)  {
3419 #ifndef _WIN32
3420     if (0!=setpriority(PRIO_PROCESS, 0, nicelevel))  {
3421       CmiPrintf("[%d] setpriority failed with value %d. \n", CmiMyPe(), nicelevel);
3422       perror("setpriority");
3423       CmiAbort("setpriority failed.");
3424     }
3425     else
3426       CmiPrintf("[%d] Charm++: setpriority %d\n", CmiMyPe(), nicelevel);
3427 #else
3428     HANDLE hProcess = GetCurrentProcess();
3429     DWORD dwPriorityClass = NORMAL_PRIORITY_CLASS;
3430     char *prio_str = "NORMAL_PRIORITY_CLASS";
3431     BOOL status;
3432     /*
3433        <-20:      real time
3434        -20--10:   high 
3435        -10-0:     above normal
3436        0:         normal
3437        0-10:      below normal
3438        10-:       idle
3439     */
3440     if (0) ;
3441 #ifdef BELOW_NORMAL_PRIORITY_CLASS
3442     else if (nicelevel<10 && nicelevel>0) {
3443       dwPriorityClass = BELOW_NORMAL_PRIORITY_CLASS;
3444       prio_str = "BELOW_NORMAL_PRIORITY_CLASS";
3445     }
3446 #endif
3447     else if (nicelevel>0) {
3448       dwPriorityClass = IDLE_PRIORITY_CLASS;
3449       prio_str = "IDLE_PRIORITY_CLASS";
3450     }
3451     else if (nicelevel<=-20) {
3452       dwPriorityClass = REALTIME_PRIORITY_CLASS;
3453       prio_str = "REALTIME_PRIORITY_CLASS";
3454     }
3455 #ifdef ABOVE_NORMAL_PRIORITY_CLASS
3456     else if (nicelevel>-10 && nicelevel<0) {
3457       dwPriorityClass = ABOVE_NORMAL_PRIORITY_CLASS;
3458       prio_str = "ABOVE_NORMAL_PRIORITY_CLASS";
3459     }
3460 #endif
3461     else if (nicelevel<0) {
3462       dwPriorityClass = HIGH_PRIORITY_CLASS;
3463       prio_str = "HIGH_PRIORITY_CLASS";
3464     }
3465     status = SetPriorityClass(hProcess, dwPriorityClass);
3466     if (!status)  {
3467         int err=GetLastError();
3468         CmiPrintf("SetPriorityClass failed errno=%d, WSAerr=%d\n",errno, err);
3469         CmiAbort("SetPriorityClass failed.");
3470     }
3471     else
3472       CmiPrintf("[%d] Charm++: setpriority %s\n", CmiMyPe(), prio_str);
3473 #endif
3474   }
3475 }
3476
3477 void CommunicationServerInit()
3478 {
3479 #if CMK_IMMEDIATE_MSG
3480   CQdCpvInit();
3481   CpvInitialize(int,CmiImmediateMsgHandlerIdx); 
3482 #endif
3483 }
3484
3485
3486 static int testEndian(void)
3487 {
3488         int test=0x1c;
3489         unsigned char *c=(unsigned char *)&test;
3490         if (c[sizeof(int)-1]==0x1c)
3491                 /* Macintosh and most workstations are big-endian */
3492                 return 1;   /* Big-endian machine */
3493         if (c[0]==0x1c)
3494                 /* Intel x86 PC's, and DEC VAX are little-endian */
3495                 return 0;  /* Little-endian machine */
3496         return -2;  /*Unknown integer type */
3497 }
3498
3499 int CmiEndianness()
3500 {
3501   static int _cmi_endianness = -1;
3502   if (_cmi_endianness == -1) _cmi_endianness = testEndian();
3503   CmiAssert(_cmi_endianness != -2);
3504   return  _cmi_endianness;
3505 }
3506
3507 /**
3508   Main Converse initialization routine.  This routine is 
3509   called by the machine file (machine.c) to set up Converse.
3510   It's "Common" because it's shared by all the machine.c files. 
3511   
3512   The main task of this routine is to set up all the Cpv's
3513   (message queues, handler tables, etc.) used during main execution.
3514   
3515   On SMP versions, this initialization routine is called by 
3516   *all* processors of a node simultaniously.  It's *also* called
3517   by the communication thread, which is rather strange but needed
3518   for immediate messages.  Each call to this routine expects a 
3519   different copy of the argv arguments, so use CmiCopyArgs(argv).
3520   
3521   Requires:
3522     - A working network layer.
3523     - Working Cpv's and CmiNodeBarrier.
3524     - CthInit to already have been called.  CthInit is called
3525       from the machine layer directly, because some machine layers
3526       (like uth) use Converse threads internally.
3527
3528   Initialization is somewhat subtle, in that various modules
3529   won't work properly until they're initialized.  For example,
3530   nobody can register handlers before calling CmiHandlerInit.
3531 */
3532 void ConverseCommonInit(char **argv)
3533 {
3534
3535 /**
3536  * The reason to initialize this variable here:
3537  * cmiArgDebugFlag is possibly accessed in CmiPrintf/CmiError etc.,
3538  * therefore, we have to initialize this variable before any calls
3539  * to those functions (such as CmiPrintf). Otherwise, we may encounter
3540  * a memory segmentation fault (bad memory access). Note, even
3541  * testing CpvInitialized(cmiArgDebugFlag) doesn't help to solve
3542  * this problem because the variable indicating whether cmiArgDebugFlag is 
3543  * initialized or not is not initialized, thus possibly causing another
3544  * bad memory access. --Chao Mei
3545  */
3546   CpvInitialize(int, _urgentSend);
3547   CpvAccess(_urgentSend) = 0;
3548 #if CMK_CCS_AVAILABLE
3549   CpvInitialize(int, cmiArgDebugFlag);
3550   CpvAccess(cmiArgDebugFlag) = 0;
3551 #endif
3552   CpvInitialize(int,charmLibExitFlag);
3553   CpvAccess(charmLibExitFlag) = 0;
3554
3555   CpvInitialize(int,_curRestartPhase);
3556   CpvAccess(_curRestartPhase)=1;
3557   CmiArgInit(argv);
3558   CmiMemoryInit(argv);
3559 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3560   CmiIOInit(argv);
3561 #endif
3562   if (CmiMyPe() == 0)
3563       CmiPrintf("Converse/Charm++ Commit ID: %s\n", CmiCommitID);
3564
3565   CpvInitialize(int, cmiMyPeIdle);
3566   CpvAccess(cmiMyPeIdle) = 0;
3567
3568 /* #if CONVERSE_POOL */
3569   CmiPoolAllocInit(30);  
3570 /* #endif */
3571   CmiTmpInit(argv);
3572   CmiTimerInit(argv);
3573   CstatsInit(argv);
3574   CmiInitCPUAffinityUtil();
3575
3576   CcdModuleInit(argv);
3577   CmiHandlerInit();
3578   CmiReductionsInit();
3579   CIdleTimeoutInit(argv);
3580   
3581 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
3582   if(CmiGetArgFlagDesc(argv,"+CmiNoProcForComThread","Is there an extra processor for the communication thread on each node(only for net-smp-*) ?")){
3583     if(CmiMyRank() == 0) _Cmi_noprocforcommthread=1;
3584    }
3585 #endif
3586         
3587 #if CMK_TRACE_ENABLED
3588   traceInit(argv);
3589 /*initTraceCore(argv);*/ /* projector */
3590 #endif
3591   CmiProcessPriority(argv);
3592
3593   CmiPersistentInit();
3594   CmiIsomallocInit(argv);
3595   CmiDeliversInit();
3596   CsdInit(argv);
3597 #if CMK_CCS_AVAILABLE
3598   CcsInit(argv);
3599 #endif
3600   CpdInit();
3601   CthSchedInit();
3602   CmiGroupInit();
3603   CmiMulticastInit();
3604   CmiInitMultipleSend();
3605   CQdInit();
3606
3607   CrnInit();
3608   CmiInitImmediateMsg();
3609   CldModuleInit(argv);
3610   
3611 #if CMK_CELL
3612   void CmiInitCell();
3613   CmiInitCell();
3614 #endif
3615
3616 #ifdef CMK_CUDA
3617   initHybridAPI(CmiMyPe()); 
3618 #endif
3619
3620   /* main thread is suspendable */
3621 /*
3622   CthSetSuspendable(CthSelf(), 0);
3623 */
3624
3625 #if CMK_BIGSIM_CHARM
3626    /* have to initialize QD here instead of _initCharm */
3627   extern void initQd(char **argv);
3628   initQd(argv);
3629 #endif
3630 }
3631
3632 void ConverseCommonExit(void)
3633 {
3634   CcsImpl_kill();
3635
3636 #if CMK_TRACE_ENABLED
3637   traceClose();
3638 /*closeTraceCore();*/ /* projector */
3639 #endif
3640
3641 #if CMI_IO_BUFFER_EXPLICIT
3642   CmiFlush(stdout);  /* end of program, always flush */
3643 #endif
3644   CmiFlush(stdout);  /* end of program, always flush */
3645
3646 #if CMK_CELL
3647   CloseOffloadAPI();
3648 #endif
3649
3650 #if CMK_CUDA
3651   exitHybridAPI(); 
3652 #endif
3653   seedBalancerExit();
3654   EmergencyExit();
3655 }
3656
3657
3658 #if CMK_CELL != 0
3659
3660 extern void register_accel_spe_funcs(void);
3661
3662 void CmiInitCell()
3663 {
3664   // Create a unique string for each PPE to use for the timing
3665   //   data file's name
3666   char fileNameBuf[64];
3667   sprintf(fileNameBuf, "speTiming.%d", CmiMyPe());
3668
3669   InitOffloadAPI(offloadCallback, NULL, NULL, fileNameBuf);
3670   //CcdCallOnConditionKeep(CcdPERIODIC, 
3671   //      (CcdVoidFn) OffloadAPIProgress, NULL);
3672   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
3673       (CcdVoidFn) OffloadAPIProgress, NULL);
3674
3675   // Register accelerated entry methods on the PPE
3676   register_accel_spe_funcs();
3677 }
3678
3679 #include "cell-api.c"
3680
3681 #endif
3682
3683 /****
3684  * CW Lee - 9/14/2005
3685  * Added a mechanism to allow some control over machines with extremely
3686  * inefficient terminal IO mechanisms. Case in point: the XT3 has a
3687  * 20ms flush overhead along with about 25MB/s bandwidth for IO. This,
3688  * coupled with a default setup using unbuffered stdout introduced
3689  * severe overheads (and hence limiting scaling) for applications like 
3690  * NAMD.
3691  */
3692 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3693 void CmiIOInit(char **argv) {
3694   CpvInitialize(int, expIOFlushFlag);
3695 #if CMI_IO_BUFFER_EXPLICIT
3696   /* 
3697      Support for an explicit buffer only makes sense if the machine
3698      layer does not wish to make its own implementation.
3699
3700      Placing this after CmiMemoryInit() means that CmiMemoryInit()
3701      MUST NOT make use of stdout if an explicit buffer is requested.
3702
3703      The setvbuf function may only be used after opening a stream and
3704      before any other operations have been performed on it
3705   */
3706   CpvInitialize(char*, explicitIOBuffer);
3707   CpvInitialize(int, expIOBufferSize);
3708   if (!CmiGetArgIntDesc(argv,"+io_buffer_size", &CpvAccess(expIOBufferSize),
3709                         "Explicit IO Buffer Size")) {
3710     CpvAccess(expIOBufferSize) = DEFAULT_IO_BUFFER_SIZE;
3711   }
3712   if (CpvAccess(expIOBufferSize) <= 0) {
3713     CpvAccess(expIOBufferSize) = DEFAULT_IO_BUFFER_SIZE;
3714   }
3715   CpvAccess(explicitIOBuffer) = (char*)CmiAlloc(CpvAccess(expIOBufferSize)*
3716                                                 sizeof(char));
3717   if (setvbuf(stdout, CpvAccess(explicitIOBuffer), _IOFBF, 
3718               CpvAccess(expIOBufferSize))) {
3719     CmiAbort("Explicit IO Buffering failed\n");
3720   }
3721 #endif
3722 #if CMI_IO_FLUSH_USER
3723   /* system default to have user control flushing of IO */
3724   /* Now look for user override */
3725   CpvAccess(expIOFlushFlag) = !CmiGetArgFlagDesc(argv,"+io_flush_system",
3726                                                  "System Controls IO Flush");
3727 #else
3728   /* system default to have system handle IO flushing */
3729   /* Now look for user override */
3730   CpvAccess(expIOFlushFlag) = CmiGetArgFlagDesc(argv,"+io_flush_user",
3731                                                 "User Controls IO Flush");
3732 #endif
3733 }
3734 #endif
3735
3736 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
3737
3738 void CmiPrintf(const char *format, ...)
3739 {
3740   CpdSystemEnter();
3741   {
3742   va_list args;
3743   va_start(args,format);
3744   vfprintf(stdout,format, args);
3745   if (CpvInitialized(expIOFlushFlag) && !CpvAccess(expIOFlushFlag)) {
3746     CmiFlush(stdout);
3747   }
3748   va_end(args);
3749 #if CMK_CCS_AVAILABLE
3750   if (CpvAccess(cmiArgDebugFlag)) {
3751     va_start(args,format);
3752     print_node0(format, args);
3753     va_end(args);
3754   }
3755 #endif
3756   }
3757   CpdSystemExit();
3758 }
3759
3760 void CmiError(const char *format, ...)
3761 {
3762   CpdSystemEnter();
3763   {
3764   va_list args;
3765   va_start(args,format);
3766   vfprintf(stderr,format, args);
3767   CmiFlush(stderr);  /* stderr is always flushed */
3768   va_end(args);
3769 #if CMK_CCS_AVAILABLE
3770   if (CpvAccess(cmiArgDebugFlag)) {
3771     va_start(args,format);
3772     print_node0(format, args);
3773     va_end(args);
3774   }
3775 #endif
3776   }
3777   CpdSystemExit();
3778 }