Merge branch 'charm' of charmgit:charm into charm
authorEsteban Meneses <emenese2@illinois.edu>
Thu, 3 May 2012 14:49:14 +0000 (09:49 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Thu, 3 May 2012 14:49:14 +0000 (09:49 -0500)
src/arch/gemini_gni-crayxe/conv-mach-papi.h [new file with mode: 0644]
src/arch/gemini_gni-crayxe/conv-mach-papi.sh [new file with mode: 0644]
src/arch/gemini_gni-crayxe/conv-mach.h
src/ck-core/cklocation.C
src/ck-core/init.C
src/ck-ldb/CommAwareRefineLB.C
src/ck-perf/trace-projections.C
src/ck-perf/trace-projections.h
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h
src/libs/ck-libs/cache/CkCache.ci

diff --git a/src/arch/gemini_gni-crayxe/conv-mach-papi.h b/src/arch/gemini_gni-crayxe/conv-mach-papi.h
new file mode 100644 (file)
index 0000000..68f51a1
--- /dev/null
@@ -0,0 +1,6 @@
+// Chee Wai 3/11/2004
+// This is really stupid, instead of being able to say "#include <papi.h>"
+// here, I am forced to not say anything and wait for the configure script
+// to decide if the library exists before adding the line to conv-mach-opt.h
+//
+// If this is not the intended idiom, then please enlighten me.
diff --git a/src/arch/gemini_gni-crayxe/conv-mach-papi.sh b/src/arch/gemini_gni-crayxe/conv-mach-papi.sh
new file mode 100644 (file)
index 0000000..518f895
--- /dev/null
@@ -0,0 +1,10 @@
+CMK_USE_PAPI=true
+USE_SPP_PAPI=true
+#you should run module load papi
+PAPI_LIBDIR="/opt/cray/papi/4.3.0.1/perf_events/no-cuda/lib"
+PAPI_INCDIR="/opt/cray/papi/4.3.0.1/perf_events/no-cuda/include"
+CMK_INCDIR="$CMK_INCDIR -I$PAPI_INCDIR"
+CMK_LIBDIR="-L $PAPI_LIBDIR"
+CMK_LD="$CMK_LD -Wl,-rpath,$PAPI_LIBDIR"
+CMK_LDXX="$CMK_LDXX -Wl,-rpath,$PAPI_LIBDIR" 
+CMK_LIBS="$CMK_LIBS -lpapi"
index d007e19209bba49568b3e77c72a33e000daff7b5..d531d633286dd0f52919acd7f3b333e333ba2185 100644 (file)
@@ -3,6 +3,9 @@
 
 #define CMK_CRAYXE                                         1
 
+// for cray xe we use the known conflict free counter set from the SPP project
+#define USE_SPP_PAPI                                       1
+
 #define XE6_TOPOLOGY                                      1
 
 /* 1 if the machine has a function called "getpagesize()", 0 otherwise .
index 82872b3d38e2bba1b35f83f137dfe5ee49d9f4ed..97b34ef20826ac937202e0f15f6e4df7cf9ae6c1 100644 (file)
@@ -2633,6 +2633,12 @@ void CkLocMgr::emigrate(CkLocRec_local *rec,int toPe)
 #if !defined(_FAULT_MLOG_)    
        informHome(idx,toPe);
 #endif
+
+#if CMK_GLOBAL_LOCATION_UPDATE
+        DEBM((AA"Global location update! idx %s to %d \n"AB,idx2str(idx),toPe));
+        thisProxy.updateLocation(idx, toPe);                        
+#endif
+
        CK_MAGICNUMBER_CHECK
 }
 
index 84485b7332322742eea3ba482c0cc3dc4a1d02d2..be1f834150816d532163da5f9addd3a4b58f050c 100644 (file)
@@ -1255,23 +1255,33 @@ void _initCharm(int unused_argc, char **argv)
 #endif
         }
         CmiInitCPUTopology(argv);
+#if CMK_SHARED_VARS_POSIX_THREADS_SMP
+        if (CmiCpuTopologyEnabled()) {
+            int *pelist;
+            int num;
+            CmiGetPesOnPhysicalNode(0, &pelist, &num);
+            if (CkMyPe()==0 && !_Cmi_noprocforcommthread && num+num/CmiMyNodeSize() > CmiNumCores()) {
+                CkPrintf("\nCharm++> Warning: the number of SMP threads is greater than the number of physical cores, use +CmiNoProcForComThread runtime option.\n\n");
+            }
+        }
+#endif
     }
 
-       if(CmiMyPe() == 0) {
+    if(CmiMyPe() == 0) {
         char *topoFilename;
-            if(CmiGetArgStringDesc(argv,"+printTopo",&topoFilename,"topo file name")) {
-               TopoManager tmgr;
-    FILE *fp;
-    fp = fopen(topoFilename, "w");
-    if (fp == NULL) {
-      CkPrintf("Error opening topology.Info.txt file, writing to stdout\n");
-      fp = stdout;
+        if(CmiGetArgStringDesc(argv,"+printTopo",&topoFilename,"topo file name")) 
+        {
+           TopoManager tmgr;
+            FILE *fp;
+            fp = fopen(topoFilename, "w");
+            if (fp == NULL) {
+              CkPrintf("Error opening topology.Info.txt file, writing to stdout\n");
+              fp = stdout;
+            }
+           tmgr.printAllocation(fp);
+            fclose(fp);
+        }
     }
-               tmgr.printAllocation(fp);
-    fclose(fp);
-           }
-       }
-
 
 #if CMK_USE_PXSHM && CMK_CRAYXE && CMK_SMP
       // for SMP on Cray XE6 (hopper) it seems pxshm has to be initialized
index 71498fc8db8d3a06d5b05ee80fb66e667df74edf..33d88b1ad21d225b03df9e678fde5f3483408174 100644 (file)
@@ -29,7 +29,7 @@
 
 #include <time.h>
 
-#define THRESHOLD 0.1
+#define THRESHOLD 0.02
 #define SWAP_MULTIPLIER 5 
 
 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid);
@@ -40,6 +40,8 @@ inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, st
 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
                            std::vector<int> &parr_above_avg, std::vector<int> &parr_below_avg,
                            bool* proc_load_info, ProcArray *parr);
+inline void getPossiblePes(std::vector<int>& possible_pes, int randomly_obj_id,
+    ObjGraph *ogr, ProcArray* parr);
 
 double upper_threshold;
 double lower_threshold;
@@ -145,12 +147,10 @@ void PrintProcObj(ProcArray *parr, std::vector<int>* parr_objs) {
 }
 
 
-void CommAwareRefineLB::work(LDStats* stats)
-{
+void CommAwareRefineLB::work(LDStats* stats) {
   /** ========================== INITIALIZATION ============================= */
   ProcArray *parr = new ProcArray(stats);       // Processor Array
   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
-  std::vector<int> vertexid_pe(ogr->vertices.size()); // Stores the cur pe of obj
   double avgload = parr->getAverageLoad();      // Average load of processors
 
   // Sets to false if it is overloaded, else to true
@@ -161,11 +161,9 @@ void CommAwareRefineLB::work(LDStats* stats)
   // that processor
   std::vector<int>* parr_objs = new std::vector<int>[parr->procs.size()];
 
-  // Create an array for each chare with processor communication info
-  ObjPeCommInfo* objpcomm = new ObjPeCommInfo[ogr->vertices.size()];
-
   upper_threshold = avgload + (avgload * THRESHOLD);
-  lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
+  //lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
+  lower_threshold = avgload;
 
   int less_loaded_counter = 0;
 
@@ -184,14 +182,13 @@ void CommAwareRefineLB::work(LDStats* stats)
   for(vert = 0; vert < ogr->vertices.size(); vert++) {
     curr_pe = ogr->vertices[vert].getCurrentPe();
     parr_objs[curr_pe].push_back(vert);
-    vertexid_pe[ogr->vertices[vert].getVertexId()] = curr_pe;
+    ogr->vertices[vert].setNewPe(curr_pe);
   }
 
-
   std::vector<int> parr_above_avg;
   std::vector<int> parr_below_avg;
 
-  double pe_load;
+  double pe_load;  
 
   // Insert into parr_above_avg if the processor fits under the criteria of
   // overloaded processor.
@@ -211,53 +208,6 @@ void CommAwareRefineLB::work(LDStats* stats)
   std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
       ProcLoadGreater(parr));
 
-  // Construct the datastructure that stores communication information for each
-  // chare with respect to processors
-  for (vert = 0; vert < ogr->vertices.size(); vert++) {
-    std::map<int, int> tmp_map_pid_index;
-    int counter = objpcomm[vert].pcomm.size();
-    int index;
-    for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
-      j = vertexid_pe[ogr->vertices[vert].sendToList[i].getNeighborId()];
-      // TODO: Should it index with vertexId?
-      if (tmp_map_pid_index.count(j) == 0) {
-        tmp_map_pid_index[j] = counter;
-        PeCommInfo pecomminf(j);
-        // TODO: Shouldn't it use vertexId instead of vert?
-        objpcomm[vert].pcomm.push_back(pecomminf);
-        counter++;
-      }
-      index = tmp_map_pid_index[j];
-    
-      objpcomm[vert].pcomm[index].num_msg +=
-        ogr->vertices[vert].sendToList[i].getNumMsgs();
-      objpcomm[vert].pcomm[index].num_bytes +=
-        ogr->vertices[vert].sendToList[i].getNumBytes();
-    }
-
-    for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
-      j = vertexid_pe[ogr->vertices[vert].recvFromList[i].getNeighborId()];
-
-      if (tmp_map_pid_index.count(j) == 0) {
-        tmp_map_pid_index[j] = counter;
-        PeCommInfo pecomminf(j);
-        // TODO: Shouldn't it use vertexId instead of vert?
-        objpcomm[vert].pcomm.push_back(pecomminf);
-        counter++;
-      }
-      index = tmp_map_pid_index[j];
-
-      objpcomm[vert].pcomm[index].num_msg +=
-        ogr->vertices[vert].sendToList[i].getNumMsgs();
-      objpcomm[vert].pcomm[index].num_bytes +=
-        ogr->vertices[vert].sendToList[i].getNumBytes();
-    }
-
-    // Sort the pe communication vector for this chare
-    std::sort(objpcomm[vert].pcomm.begin(), objpcomm[vert].pcomm.end(),
-        ProcCommGreater());
-  }
-
   int random;
   int randomly_obj_id;
   bool obj_allocated;
@@ -269,88 +219,91 @@ void CommAwareRefineLB::work(LDStats* stats)
 
   // Keep on loadbalancing until the number of above avg processors is 0
   while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
-      // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
-      //    parr_below_avg.size(), total_swaps);
-      obj_allocated = false;
-      num_tries = 0;
-
-      // Pop the heaviest processor
-      int p_index = popFromProcHeap(parr_above_avg, parr);
-      ProcInfo& p = parr->procs[p_index];
+    // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
+    //    parr_below_avg.size(), total_swaps);
+    obj_allocated = false;
+    num_tries = 0;
+
+    // Pop the heaviest processor
+    int p_index = popFromProcHeap(parr_above_avg, parr);
+    ProcInfo& p = parr->procs[p_index];
+
+    while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
+
+      // It might so happen that due to overhead load, it might not have any
+      // more objects in its list
+      if (parr_objs[p.getProcId()].size() == 0) {
+        // CkPrintf("No obj left to be allocated\n");
+        obj_allocated = true;
+        break;
+      }
 
-      while (!obj_allocated && num_tries < 5) {
+      int randd = rand();
+      random = randd % parr_objs[p.getProcId()].size();
+      randomly_obj_id = parr_objs[p.getProcId()][random];
+      obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
 
-        // It might so happen that due to overhead load, it might not have any
-        // more objects in its list
-        if (parr_objs[p.getProcId()].size() == 0) {
-          // CkPrintf("No obj left to be allocated\n");
-          obj_allocated = true;
-          break;
-        }
+      // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
+      //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
+      std::vector<int> possible_pes;
+      getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
+      for (i = 0; i < possible_pes.size(); i++) {
 
-        int randd = rand();
-        random = randd % parr_objs[p.getProcId()].size();
-        randomly_obj_id = parr_objs[p.getProcId()][random];
-        obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
+        // If the heaviest communicating processor is there in the list, then
+        // assign it to that.
+        possible_pe = possible_pes[i];
 
-        // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
-        //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
-        for (i = 0; i < objpcomm[randomly_obj_id].pcomm.size(); i++) {
+        if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
+         // CkPrintf("**  Transfered %d(Load %lf) from %d:%d(Load %lf) to %d:%d(Load %lf)\n",
+         //     randomly_obj_id, obj_load, CkNodeOf(p.getProcId()), p.getProcId(), p.getTotalLoad(),
+         //     CkNodeOf(possible_pe), possible_pe,
+         //     parr->procs[possible_pe].getTotalLoad());
 
-          // If the heaviest communicating processor is there in the list, then
-          // assign it to that.
-          possible_pe = objpcomm[randomly_obj_id].pcomm[i].pe_id;
+          handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
+          obj_allocated = true;
+          total_swaps--;
+          updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
+              parr_above_avg, parr_below_avg, proc_load_info, parr);
 
-          if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
-            //CkPrintf("** Iter[%d] : Transfered %d from %d : Load %E to %d : Load %E Comm : %d\n",
-            //    i, randomly_obj_id, p.getProcId(), p.getTotalLoad(),
-            //    possible_pe, parr->procs[possible_pe].getTotalLoad(),
-            //    objpcomm[randomly_obj_id].pcomm[i].num_msg);
+          break;
+        }
+      }
 
-            handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
+      // Since there is no processor in the least loaded list with which this
+      // chare communicates, pick a random least loaded processor.
+      if (!obj_allocated) {
+        //CkPrintf(":( Could not transfer to the nearest communicating ones\n");
+        for (int x = 0; x < parr_below_avg.size(); x++) {
+          int random_pe = parr_below_avg[x];
+          if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
             obj_allocated = true;
             total_swaps--;
-            updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
+            handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
+            updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
                 parr_above_avg, parr_below_avg, proc_load_info, parr);
-
             break;
           }
-        }
-
-        // Since there is no processor in the least loaded list with which this
-        // chare communicates, pick a random least loaded processor.
-        if (!obj_allocated) {
-          // CkPrintf(":( Could not transfer to the nearest communicating ones\n");
-          for (int x = 0; x < parr_below_avg.size(); x++) {
-            int random_pe = parr_below_avg[x];
-            if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
-              obj_allocated = true;
-              total_swaps--;
-              handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
-              updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
-                  parr_above_avg, parr_below_avg, proc_load_info, parr);
-              break;
-            }
-            num_tries++;
-          }
+          num_tries++;
         }
       }
+    }
 
-      if (!obj_allocated) {
-        CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
-       // parr_above_avg.push_back(p_index);
-       // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
-       //     ProcLoadGreater(parr));
-      }
+    if (!obj_allocated) {
+      //CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
+      // parr_above_avg.push_back(p_index);
+      // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
+      //     ProcLoadGreater(parr));
     }
+  }
 
+  //CkPrintf("CommAwareRefine> After lb max load: %lf avg load: %lf\n", max_load, avg_load/parr->procs.size());
 
   /** ============================== CLEANUP ================================ */
   ogr->convertDecisions(stats);         // Send decisions back to LDStats
   delete parr;
   delete ogr;
+  delete proc_load_info;
   delete[] parr_objs;
-  delete[] objpcomm;
 }
 
 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid) {
@@ -362,7 +315,6 @@ inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid)
   }
 }
 
-
 inline void printMapping(std::vector<Vertex> &vertices) {
   for (int i = 0; i < vertices.size(); i++) {
     CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
@@ -438,6 +390,80 @@ inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold,
 
 }
 
+inline void getPossiblePes(std::vector<int>& possible_pes, int vert,
+    ObjGraph *ogr, ProcArray* parr) {
+  std::map<int, int> tmp_map_pid_index;
+  int counter = 0;
+  int index;
+  int i, j, nbrid;
+  ObjPeCommInfo objpcomm;
+ // CkPrintf("%d sends msgs to %d and recv msgs from %d\n", vert,
+ //   ogr->vertices[vert].sendToList.size(),
+ //   ogr->vertices[vert].recvFromList.size());
+  
+  for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
+    nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
+    j = ogr->vertices[nbrid].getNewPe(); // Fix me!! New PE
+    // TODO: Should it index with vertexId?
+    if (tmp_map_pid_index.count(j) == 0) {
+      tmp_map_pid_index[j] = counter;
+      PeCommInfo pecomminf(j);
+      // TODO: Shouldn't it use vertexId instead of vert?
+      objpcomm.pcomm.push_back(pecomminf);
+      counter++;
+    }
+    index = tmp_map_pid_index[j];
+
+    objpcomm.pcomm[index].num_msg +=
+      ogr->vertices[vert].sendToList[i].getNumMsgs();
+    objpcomm.pcomm[index].num_bytes +=
+      ogr->vertices[vert].sendToList[i].getNumBytes();
+  }
+
+  for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
+    nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
+    j = ogr->vertices[nbrid].getNewPe();
+
+    if (tmp_map_pid_index.count(j) == 0) {
+      tmp_map_pid_index[j] = counter;
+      PeCommInfo pecomminf(j);
+      // TODO: Shouldn't it use vertexId instead of vert?
+      objpcomm.pcomm.push_back(pecomminf);
+      counter++;
+    }
+    index = tmp_map_pid_index[j];
+
+    objpcomm.pcomm[index].num_msg +=
+      ogr->vertices[vert].sendToList[i].getNumMsgs();
+    objpcomm.pcomm[index].num_bytes +=
+      ogr->vertices[vert].sendToList[i].getNumBytes();
+  }
+
+  // Sort the pe communication vector for this chare
+  std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
+      ProcCommGreater());
+
+  int pe_id;
+  int node_id;
+  int node_size;
+  int node_first;
+  //CkPrintf("%d talks to %d pes and possible pes are :\n", vert,
+  //    objpcomm.pcomm.size());
+  for (i = 0; i < objpcomm.pcomm.size(); i++) {
+    pe_id = objpcomm.pcomm[i].pe_id;
+    node_id = CkNodeOf(pe_id);
+    node_size = CkNodeSize(node_id);
+    node_first = CkNodeFirst(node_id);
+   // CkPrintf("smp details pe_id %d, node_id %d, node_size %d, node_first %d\n",
+   //   pe_id, node_id, node_size, node_first);
+    for (j = 0; j < node_size; j++) {
+      possible_pes.push_back(node_first + j);
+      //CkPrintf("\t %d:%d (comm: %d)\n",node_id, node_first+j, objpcomm.pcomm[i].num_bytes); 
+    }
+  }
+}
+
+
 #include "CommAwareRefineLB.def.h"
 
 /*@}*/
index 17de54bccbc0015032f2cbefaa0bce4d746301a3..19997745bb596c59a9c454f50cf32810c100f87e 100644 (file)
@@ -100,7 +100,11 @@ On T3E, we need to have file number control by open/close files only when needed
 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
 
 #if CMK_HAS_COUNTER_PAPI
+#ifdef USE_SPP_PAPI
+int papiEvents[NUMPAPIEVENTS];
+#else
 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
+#endif
 #endif // CMK_HAS_COUNTER_PAPI
 
 /**
@@ -1069,21 +1073,80 @@ TraceProjections::TraceProjections(char **argv):
 
 #if CMK_SMP
   //PAPI_thread_init has to finish before calling PAPI_create_eventset
-  CmiNodeAllBarrier();
+  #if CMK_SMP_TRACE_COMMTHREAD
+      CmiNodeAllBarrier();
+  #else
+      CmiNodeBarrier();
+  #endif
 #endif
   // PAPI 3 mandates the initialization of the set to PAPI_NULL
   papiEventSet = PAPI_NULL; 
   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
     CmiAbort("PAPI failed to create event set!\n");
   }
+#ifdef USE_SPP_PAPI
+  //  CmiPrintf("Using SPP counters for PAPI\n");
+  if(PAPI_query_event(PAPI_FP_OPS)==PAPI_OK) {
+    papiEvents[0] = PAPI_FP_OPS;
+  }else{
+    if(CmiMyPe()==0){
+      CmiAbort("WARNING: PAPI_FP_OPS doesn't exist on this platform!");
+    }
+  }
+  if(PAPI_query_event(PAPI_TOT_INS)==PAPI_OK) {
+    papiEvents[1] = PAPI_TOT_INS;
+  }else{
+    CmiAbort("WARNING: PAPI_TOT_INS doesn't exist on this platform!");
+  }
+  int EventCode;
+  int ret;
+  ret=PAPI_event_name_to_code("perf::PERF_COUNT_HW_CACHE_LL:MISS",&EventCode);
+  if(PAPI_query_event(EventCode)==PAPI_OK) {
+    papiEvents[2] = EventCode;
+  }else{
+    CmiAbort("WARNING: perf::PERF_COUNT_HW_CACHE_LL:MISS doesn't exist on this platform!");
+  }
+  ret=PAPI_event_name_to_code("DATA_PREFETCHER:ALL",&EventCode);
+  if(PAPI_query_event(EventCode)==PAPI_OK) {
+    papiEvents[3] = EventCode;
+  }else{
+    CmiAbort("WARNING: DATA_PREFETCHER:ALL doesn't exist on this platform!");
+  }
+  if(PAPI_query_event(PAPI_L1_DCA)==PAPI_OK) {
+    papiEvents[4] = PAPI_L1_DCA;
+  }else{
+    CmiAbort("WARNING: PAPI_L1_DCA doesn't exist on this platform!");
+  }
+  if(PAPI_query_event(PAPI_TOT_CYC)==PAPI_OK) {
+    papiEvents[5] = PAPI_TOT_CYC;
+  }else{
+    CmiAbort("WARNING: PAPI_TOT_CYC doesn't exist on this platform!");
+  }
+#else
+  // just uses { PAPI_L2_DCM, PAPI_FP_OPS } the 2 initialized PAPI_EVENTS
+#endif
   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
-  if (papiRetValue != PAPI_OK) {
+  if (papiRetValue < 0) {
     if (papiRetValue == PAPI_ECNFLCT) {
       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
     } else {
+      char error_str[PAPI_MAX_STR_LEN];
+      PAPI_perror(papiRetValue,error_str,PAPI_MAX_STR_LEN);
+      CmiPrintf("PAPI failed with error %s val %d\n",error_str,papiRetValue);
       CmiAbort("PAPI failed to add designated events!\n");
     }
   }
+  if(CkMyPe()==0)
+    {
+      CmiPrintf("Registered %d PAPI counters:",NUMPAPIEVENTS);
+      char nameBuf[PAPI_MAX_STR_LEN];
+      for(int i=0;i<NUMPAPIEVENTS;i++)
+       {
+         PAPI_event_code_to_name(papiEvents[i], nameBuf);
+         CmiPrintf("%s ",nameBuf);
+       }
+      CmiPrintf("\n");
+    }
   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
 #endif
 }
index b13dee81764d877a103c56b43ebd93034bd6299f..b2601a4c8aec75c68130dee2a9bbebc94fd50947 100644 (file)
 
 #if CMK_HAS_COUNTER_PAPI
 #include <papi.h>
+#ifdef USE_SPP_PAPI
+#define NUMPAPIEVENTS 6
+#else
 #define NUMPAPIEVENTS 2
 #endif
+#endif
 
 #if CMK_PROJECTIONS_USE_ZLIB
 #include <zlib.h>
index bf881149d0e2ad08486f8711589173da2f4175ea..b3a8c30e374bfb67728a33a75c2a74a342afe497 100644 (file)
@@ -15,6 +15,7 @@
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
+// #define STREAMER_EXPERIMENTAL
 
 struct MeshLocation {
   int dimension; 
@@ -61,6 +62,7 @@ class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
     detectorLocalObj_ = detectorLocalObj;
   }
   void receiveRedeliveredItem(dtype data) {
+    CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
     detectorLocalObj_->consume();
     process(data);
   }
@@ -114,7 +116,10 @@ private:
     double progressPeriodInMs_; 
     bool isPeriodicFlushEnabled_; 
     bool hasSentRecently_;
-
+#ifdef STREAMER_EXPERIMENTAL
+    bool hasSentPreviously_;
+    bool immediateMode_; 
+#endif
     MeshStreamerMessage<dtype> ***dataBuffers_;
 
     CProxy_CompletionDetector detector_;
@@ -143,6 +148,7 @@ private:
     virtual void initLocalClients() = 0;
 
     void flushLargestBuffer();
+    void flushToIntermediateDestinations();
 
 protected:
 
@@ -249,6 +255,9 @@ MeshStreamer<dtype>::MeshStreamer(
 
   isPeriodicFlushEnabled_ = false; 
   detectorLocalObj_ = NULL;
+#ifdef STREAMER_EXPERIMENTAL
+  immediateMode_ = false; 
+#endif
 
 #ifdef CACHE_LOCATIONS
   cachedLocations_ = new MeshLocation[numMembers_];
@@ -430,6 +439,10 @@ void MeshStreamer<dtype>::associateCallback(
                          CkCallback startCb, CkCallback endCb, 
                          CProxy_CompletionDetector detector, 
                          int prio) {
+#ifdef STREAMER_EXPERIMENTAL
+  immediateMode_ = false;
+  hasSentPreviously_ = false; 
+#endif
   yieldCount_ = 0; 
   prio_ = prio;
   userCallback_ = endCb; 
@@ -469,20 +482,31 @@ void MeshStreamer<dtype>::finish() {
 template <class dtype>
 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
 
-  int destinationPe; 
+  int destinationPe, lastDestinationPe
   MeshLocation destinationLocation;
 
+  lastDestinationPe = -1;
   for (int i = 0; i < msg->numDataItems; i++) {
     destinationPe = msg->destinationPes[i];
     dtype &dataItem = msg->getDataItem(i);
-    destinationLocation = determineLocation(destinationPe);
     if (destinationPe == CkMyPe()) {
       localDeliver(dataItem);
     }
     else {
+      if (destinationPe != lastDestinationPe) {
+        // do this once per sequence of items with the same destination
+        destinationLocation = determineLocation(destinationPe);
+      }
       storeMessage(destinationPe, destinationLocation, &dataItem);   
     }
+    lastDestinationPe = destinationPe; 
+  }
+
+#ifdef STRAMER_EXPERIMENTAL
+  if (immediateMode_) {
+    flushToIntermediateDestinations();
   }
+#endif
 
   delete msg;
 
@@ -588,8 +612,53 @@ void MeshStreamer<dtype>::flushAllBuffers() {
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::flushDirect(){
+void MeshStreamer<dtype>::flushToIntermediateDestinations() {
+
+  MeshStreamerMessage<dtype> **messageBuffers; 
+  MeshStreamerMessage<dtype> *destinationBuffer; 
+  int destinationIndex, numBuffers; 
+
+  for (int i = 0; i < numDimensions_; i++) {
 
+    messageBuffers = dataBuffers_[i]; 
+    numBuffers = individualDimensionSizes_[i]; 
+
+    for (int j = 0; j < numBuffers; j++) {
+
+      if(messageBuffers[j] == NULL) {
+       continue;
+      }
+
+      messageBuffers = dataBuffers_[i]; 
+      destinationBuffer = messageBuffers[j];
+      destinationIndex = myIndex_ + 
+       (j - myLocationIndex_[i]) * 
+       combinedDimensionSizes_[i] ;
+
+      if (destinationBuffer->numDataItems < bufferSize_) {
+       // not sending the full buffer, shrink the message size
+       envelope *env = UsrToEnv(destinationBuffer);
+       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
+                         (bufferSize_ - destinationBuffer->numDataItems));
+       *((int *) env->getPrioPtr()) = prio_;
+      }
+      numDataItemsBuffered_ -= destinationBuffer->numDataItems;
+
+      if (i == 0) {
+        deliverToDestination(destinationIndex, destinationBuffer);
+      }
+      else {
+       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
+      }
+      messageBuffers[j] = NULL;
+    }
+  }
+}
+
+
+
+template <class dtype>
+void MeshStreamer<dtype>::flushDirect(){
   // flush if (1) this is not a periodic call or 
   //          (2) this is a periodic call and no sending took place
   //              since the last time the function was invoked
@@ -604,6 +673,20 @@ void MeshStreamer<dtype>::flushDirect(){
     
   }
 
+#ifdef STREAMER_EXPERIMENTAL
+  // switch into immediate sending mode when 
+  // number of items buffered is small; avoid doing the switch 
+  // at the beginning before any sending has taken place
+  if (hasSentPreviously_ && 
+      (numDataItemsBuffered_ < .1 * totalBufferCapacity_)) {
+    immediateMode_ = true; 
+  } 
+
+  if (!hasSentPreviously_) {
+    hasSentPreviously_ = hasSentRecently_; 
+  }
+#endif
+
   hasSentRecently_ = false; 
 
 }
index 0b04dfc4dd2451512643e44f02164357f1600c8f..d8fac673670d540919c1e52b7b1e08ab1ef6765b 100644 (file)
@@ -8,15 +8,15 @@ module CkCache {
     entry CkCacheManager(int size, CkGroupID gid);
     entry CkCacheManager(int size, int n, CkGroupID gid[n]);
     entry CkCacheManager(int size, int n, CkGroupID gid[n], int nWB, CkGroupID gidWB[nWB]);
-    entry [local] void * requestData(CkCacheKey what, CkArrayIndex &toWhom, int chunk, CkCacheEntryType *type, CkCacheRequestorData &req);
-    entry [local] void * requestDataNoFetch(CkCacheKey key, int chunk);
+    //entry [local] void * requestData(CkCacheKey what, CkArrayIndex &toWhom, int chunk, CkCacheEntryType *type, CkCacheRequestorData &req);
+    //entry [local] void * requestDataNoFetch(CkCacheKey key, int chunk);
     entry [local] void cacheSync(int &numChunks, CkArrayIndex &chareIdx, int &localIdx);
     entry void recvData(CkCacheFillMsg *msg);
-    entry [local] void recvData(CkCacheKey key, CkArrayIndex &from, CkCacheEntryType *type, int chunk, void *data);
+    //entry [local] void recvData(CkCacheKey key, CkArrayIndex &from, CkCacheEntryType *type, int chunk, void *data);
     entry void writebackChunk(int num);
     entry void finishedChunk(int num, CmiUInt8 weight);
     entry void collectStatistics(CkCallback &cb);
-    entry [local] std::map<CkCacheKey,CkCacheEntry*> *getCache();
-    entry [local] CkCacheEntry *requestCacheEntryNoFetch(CkCacheKey key, int chunk);
+    //entry [local] std::map<CkCacheKey,CkCacheEntry*> *getCache();
+    //entry [local] CkCacheEntry *requestCacheEntryNoFetch(CkCacheKey key, int chunk);
   };
 };