major changes and ported to crayXT.
authorGengbin Zheng <gzheng@illinois.edu>
Thu, 22 Oct 2009 15:27:44 +0000 (15:27 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Thu, 22 Oct 2009 15:27:44 +0000 (15:27 +0000)
1. modernized machine interface
2. remove processor number limit due to locks being as globals.
3. add memory management code (mem-arena) for symmetric heap, it manages a big chunk of memory in isomalloc-like way.

src/arch/shmem/machine.c

index 6be498106f9f1cd0a34faa205f1a5263065951af..dca5e4101f77d56db18085c2ebeff9858b61ead1 100644 (file)
@@ -18,6 +18,9 @@
 #include <unistd.h>
 #include <malloc.h>
 #include "converse.h"
+
+#include "mem-arena.h"
+
 #include CMK_SHMEM_H
 
 #if !CMK_SHMEM_T3E
@@ -28,7 +31,7 @@
  *  We require statically allocated variables for locks.  This defines
  *  the max number of processors available.
  */
-#define MAX_PES 2048
+#define MAX_PES 4096
 
 /*
  * Some constants
@@ -147,12 +150,17 @@ static McQueue *broadcast_tmp_queue;
  */
 static McDistList head;
 
+#if CMK_ARENA_MALLOC
+/* lock can be in symmetric heap */
+static long *my_lock;
+static long *head_lock;
+static long *bcast_lock;
+#else
 /* Static variables are necessary for locks. */
 static long *my_lock;
 static long head_lock[MAX_PES];
 static long bcast_lock[MAX_PES];
-
-#define ALIGN8(x)  (8*(((x)+7)/8))
+#endif
 
 int McChecksum(char *msg, int size)
 {
@@ -165,6 +173,12 @@ int McChecksum(char *msg, int size)
   return chksm;
 }
 
+void CmiPushPE(int pe,void *msg)
+{
+  CmiAbort("Not implemented!");
+  /* McEnqueueRemote(msg,ALIGN8(size),pe); */
+}
+
 /**********************************************************************
  *  CMI Functions START HERE
  */
@@ -438,6 +452,7 @@ void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
 void CmiAbort(const char *message)
 {
   CmiError(message);
+  /* *(char*)NULL = 0; */
   globalexit(1);
 }
 
@@ -457,12 +472,82 @@ void *CmiGetNonLocal()
   return (void *)McQueueRemoveFromFront(received_queue);
 }
 
+static char     **Cmi_argv;
+static char     **Cmi_argvcopy;
+static CmiStartFn Cmi_startfn;   /* The start function */
+static int        Cmi_usrsched;  /* Continue after start function finishes? */
+
+static void CommunicationServerThread(int sleepTime)
+{
+#if CMK_SMP
+  CommunicationServer(sleepTime);
+#endif
+#if CMK_IMMEDIATE_MSG
+  CmiHandleImmediate();
+#endif
+}
+
+static void ConverseRunPE(int everReturn)
+{
+  char** CmiMyArgv;
+
+  if (CmiMyRank())
+    CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
+  else
+    CmiMyArgv=Cmi_argv;
+
+  CthInit(CmiMyArgv);
+
+  ConverseCommonInit(CmiMyArgv);
+
+
+  /* communication thread */
+  if (CmiMyRank() == CmiMyNodeSize()) {
+    Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
+    while (1) CommunicationServerThread(5);
+  }
+  else {  /* worker thread */
+  if (!everReturn) {
+    Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
+    if (Cmi_usrsched==0) CsdScheduler(-1);
+    ConverseExit();
+  }
+  }
+}
+
+void arena_init();
+
 void 
 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
 {
+  Cmi_argvcopy = CmiCopyArgs(argv);
+  Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
+
   McInit();
-  CthInit(argv);
-  ConverseCommonInit(argv);
+#if CMK_ARENA_MALLOC
+  arena_init();
+#endif
+
+  {
+  int debug = CmiGetArgFlag(argv,"++debug");
+  int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
+  if (debug || debug_no_pause)
+  {   /*Pause so user has a chance to start and attach debugger*/
+#if CMK_HAS_GETPID
+    printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
+    fflush(stdout);
+    if (!debug_no_pause)
+      sleep(10);
+#else
+    printf("++debug ignored.\n");
+#endif
+  }
+  }
+
+  /* CmiStartThreads(argv); */
+  ConverseRunPE(initret);
+
+/*
   for(argc=0;argv[argc];argc++);
   if (initret==0)
   {
@@ -470,6 +555,7 @@ ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
     if (usched==0) CsdScheduler(-1);
     ConverseExit();
   }
+*/
 }
 
 void ConverseExit()
@@ -480,6 +566,9 @@ void ConverseExit()
   }
 #endif
   ConverseCommonExit();
+#if CMK_CRAYXT
+  shmem_finalize();
+#endif
   exit(0);
 }
 
@@ -511,8 +600,25 @@ void clear_lock(long *lock, int pe)
 }
 
 #else
+
+#if 1
 #define set_lock(lock, pe)  shmem_set_lock(lock)
 #define clear_lock(lock, pe)  shmem_clear_lock(lock)
+#else              /* for debugging */
+void set_lock(long *lock, int pe)
+{
+  //printf("[%d] set_lock %d %d\n", CmiMyPe(), pe, *lock);
+  shmem_set_lock(lock);
+  // while (shmem_test_lock(lock)) usleep(1);
+  //printf("[%d] after set_lock %d %d\n", CmiMyPe(), pe, *lock);
+}
+void clear_lock(long *lock, int pe)
+{
+  //printf("[%d] free_lock %d %d\n", CmiMyPe(), pe, *lock);
+  shmem_clear_lock(lock);
+  //printf("[%d] after free_lock %d %d\n", CmiMyPe(), pe, *lock);
+}
+#endif
 #endif
 
 /**********************************************************************
@@ -524,12 +630,18 @@ static void McInit(void)
 {
   CMK_SHMEM_INIT;
 
-  CpvInitialize(void *, CmiLocalQueue);
-  CpvAccess(CmiLocalQueue) = CdsFifo_Create();
+#if !CMK_CRAYXT
   _Cmi_mype = _my_pe();
   _Cmi_numpes = _num_pes();
+#else
+  _Cmi_mype = shmem_my_pe();
+  _Cmi_numpes = shmem_n_pes();
+#endif
   _Cmi_myrank = 0;
 
+  CpvInitialize(void *, CmiLocalQueue);
+  CpvAccess(CmiLocalQueue) = CdsFifo_Create();
+
   McInitList();
 }
 
@@ -548,23 +660,31 @@ static void McInitList(void)
   head.nxt_node = list_empty;
   head.nxt_addr = NULL;
   head.msg_sz = 0;
+
+#if CMK_ARENA_MALLOC
+  head_lock = shmalloc(sizeof(long)*_Cmi_numpes);
+  _MEMCHECK(head_lock);
+  bcast_lock = shmalloc(sizeof(long)*_Cmi_numpes);
+  _MEMCHECK(bcast_lock);
+#else
   if (_Cmi_numpes > MAX_PES)
   {
     CmiPrintf("Not enough processors allocated in machine.c.\n");
-    CmiPrintf("Change MAX_PES in t3e/machine.c to at least %d and recompile Converse\n",
+    CmiPrintf("Change MAX_PES in machine.c to at least %d and recompile Converse\n",
     _Cmi_numpes);
   }
+#endif
   for(i=0; i < _Cmi_numpes; i++)
   {
     head_lock[i] = 0;
     bcast_lock[i] = 0;
   }
   my_lock = &(head_lock[_Cmi_mype]);
-  shmem_barrier_all();
 /*
   clear_lock(my_lock, _Cmi_mype);
   clear_lock(&bcast_lock[_Cmi_mype], _Cmi_mype);
 */
+  shmem_barrier_all();
 }
 
 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
@@ -900,87 +1020,104 @@ static void * McQueueRemoveFromFront(McQueue *queue)
   return element;
 }
 
+#if CMK_ARENA_MALLOC
 
-/* Timer Functions */
-#if 0
-
-static double clocktick;
-static long inittime_wallclock;
-static long inittime_virtual;
-
-void CmiTimerInit()
-{
-  inittime_wallclock = _rtc();
-  inittime_virtual = cpused();
-  clocktick = 1.0 / (sysconf(_SC_CLK_TCK));
-}
+static char *arena = NULL;
+static slotset *myss = NULL;
+static int slotsize=1*1024;
 
-double CmiWallTimer()
-{
-  long now;
+typedef struct ArenaBlock {
+      CmiInt8 slot;   /* First slot */
+      CmiInt8 length; /* Length, in bytes*/
+} ArenaBlock;
 
-  now = _rtc();
-  return (clocktick * (now - inittime_wallclock));
+/* Convert a heap block pointer to/from a CmiIsomallocBlock header */
+static void *block2pointer(ArenaBlock *blockHeader) {
+        return (void *)(blockHeader+1);
 }
-
-double CmiCpuTimer()
-{
-  long now;
-
-  now = cpused();
-  return (clocktick * (now - inittime_virtual));
+static ArenaBlock *pointer2block(void *heapBlock) {
+        return ((ArenaBlock *)heapBlock)-1;
 }
 
-double CmiTimer()
-{
-  long now;
-
-  now = _rtc();
-  return (clocktick * (now - inittime_wallclock));
+/* Return the number of slots in a block with n user data bytes */
+static int length2slots(int nBytes) {
+        return (sizeof(ArenaBlock)+nBytes+slotsize-1)/slotsize;
 }
 
-#elif CMK_TIMER_USE_CLOCK_GETTIME
+#define MAX_MEM    (64*1024*1024)        
 
-CpvStaticDeclare(double,inittime_wallclock);
-CpvStaticDeclare(double,inittime_virtual);
-
-void CmiTimerInit(void)
+void arena_init()
 {
-  struct timespec temp;
-  CpvInitialize(double, inittime_wallclock);
-  CpvInitialize(double, inittime_virtual);
-  clock_gettime(CLOCK_SGI_CYCLE, &temp);
-  CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
-                                  1e-9 * temp.tv_nsec;
-  CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
+  size_t maxmem = 0;
+  int nslots;
+  char *s;
+#if CMK_CRAYXT
+  if (s = getenv("XT_SYMMETRIC_HEAP_SIZE")) {
+    size_t n=0;
+    switch (s[strlen(s)-1]) {
+    case 'G':  {
+      sscanf(s, "%dG", &n);  n *= 1024*1024*1024; 
+      break;
+      }
+    case 'M': {
+      sscanf(s, "%dM", &n);  n *= 1024*1024; 
+      break;
+      }
+    case 'K': {
+      sscanf(s, "%dK", &n);  n *= 1024; 
+      break;
+      }
+    default: {
+      n =atoi(s);
+      break;
+      }
+    }
+    if (n>0) {
+      n -= sizeof(long)*2*CmiNumPes();    /* for locks */
+      n -= 2*1024*1024;                   /* less 2MB */
+      if (n>0) maxmem = n;    /* a little less */
+    }
+  }
+#endif
+  if (maxmem == 0) maxmem = MAX_MEM;
+  if (CmiMyPe()==0) CmiPrintf("Charm++> Total of %dMB symmetric heap memory allocated.\n", maxmem/1024/1024);
+  arena = shmalloc(maxmem);           /* global barrier */
+  _MEMCHECK(arena);
+  nslots = maxmem/slotsize;
+  myss = new_slotset(0, nslots);
 }
 
-double CmiWallTimer(void)
-{
-  struct timespec temp;
-  double currenttime;
-
-  clock_gettime(CLOCK_SGI_CYCLE, &temp); 
-  currenttime = (double) temp.tv_sec +
-                1e-9 * temp.tv_nsec;
-  return (currenttime - CpvAccess(inittime_wallclock));
+void *arena_malloc(int size) 
+{   
+        CmiInt8 s,n;
+        ArenaBlock *blk;
+        if (size==0) return NULL;
+        n=length2slots(size);
+        /*Always satisfy mallocs with local slots:*/
+        s=get_slots(myss,n);
+        if (s==-1) {
+                CmiError("Not enough address space left in shmem on processor %d for %d bytes!\n", 
+                         CmiMyPe(),size);
+                CmiAbort("Out of symmetric heap space for arena_malloc");
+        }
+       grab_slots(myss,s,n);
+       blk = (ArenaBlock*)(arena + s*slotsize);
+        blk->slot=s;
+        blk->length=size;
+        return block2pointer(blk);
 }
 
-double CmiCpuTimer(void)
-{
-  struct timespec temp;
-  double currenttime;
-
-  clock_gettime(CLOCK_SGI_CYCLE, &temp);
-  currenttime = (double) temp.tv_sec +
-                1e-9 * temp.tv_nsec;
-  return (currenttime - CpvAccess(inittime_virtual));
+void arena_free(void *blockPtr)
+{   
+        ArenaBlock *blk;
+        CmiInt8 s,n;
+        if (blockPtr==NULL) return;
+        blk = pointer2block(blockPtr);
+        s=blk->slot;  
+        n=length2slots(blk->length); 
+        free_slots(myss, s, n);
 }
 
-double CmiTimer(void)
-{
-  return CmiCpuTimer();
-}              
 #endif
 
 /*   Memory lock and unlock functions */
@@ -991,4 +1128,14 @@ static volatile int memflag;
 void CmiMemLock() { memflag=1; }
 void CmiMemUnlock() { memflag=0; }
 
+int CmiBarrier()
+{
+  return -1;
+}
+
+int CmiBarrierZero()
+{
+  return -1;
+}
+
 /*@}*/