Adding support for modifying entries in the Charm++ message queues.
authorIsaac Dooley <idooley2@illinois.edu>
Mon, 14 Sep 2009 18:22:49 +0000 (18:22 +0000)
committerIsaac Dooley <idooley2@illinois.edu>
Mon, 14 Sep 2009 18:22:49 +0000 (18:22 +0000)
src/conv-core/modifyScheduler.C [new file with mode: 0644]
src/conv-core/queueing.c
src/conv-core/queueing.h
src/scripts/Makefile

diff --git a/src/conv-core/modifyScheduler.C b/src/conv-core/modifyScheduler.C
new file mode 100644 (file)
index 0000000..6aff160
--- /dev/null
@@ -0,0 +1,49 @@
+/** 
+    @file 
+    Routines for modifying the Charm++ prioritized message queue
+    @ingroup CharmScheduler
+    
+    @addtogroup CharmScheduler
+    @{
+ */
+
+
+#include "charm++.h"
+#include "queueing.h" // For access to scheduler data structures
+#include "conv-trace.h"
+
+/** Search Queue for messages associated with a specified entry method */ 
+void* CqsIncreasePriorityForEntryMethod(Queue q, const int entrymethod){
+  int i;
+  void **entries;
+  int numMessages = q->length;
+  
+  CqsEnumerateQueue(q, &entries);
+  
+  for(i=0;i<numMessages;i++){
+    envelope *env = (envelope*)entries[i];
+    if(env != NULL){
+      if(env->getMsgtype() == ForArrayEltMsg || env->getMsgtype() == ForChareMsg){
+       const int ep = env->getsetArrayEp();
+       if(ep==entrymethod){
+         // Remove the entry from the queue      
+         CqsRemoveSpecific(q,env);
+         
+         int prio = -50000; 
+         CqsEnqueueGeneral(q, (void*)env, CQS_QUEUEING_IFIFO, 0, (unsigned int*)&prio);
+
+         char traceStr[64];
+         sprintf(traceStr, "Replacing %p in prioq with NULL", env);
+         traceUserSuppliedNote(traceStr);
+
+         break;
+       }
+      }  
+    }
+  }
+  
+  CmiFree(entries);
+}
+
+
+/** @} */
index 4e9e1bef7afed7131d4a4ad0366d1817c8915868..879b0e9927e75a5a8d2d67793ffdbe5ecbc74685 100644 (file)
 #include "queueing.h"
 
 /** @defgroup CharmScheduler 
-    \brief The portion of Charm++ responsible for scheduling the execution 
+    @brief The portion of Charm++ responsible for scheduling the execution 
     of Charm++ entry methods
 
     CqsEnqueueGeneral() is the main function that is responsible for enqueueing 
     messages. It will store the messages in one of three queues based on the 
-    specified priorities or strategies.
+    specified priorities or strategies. The Charm++ message queue is really three 
+    queues, one for positive priorities, one for zero priorities, and one for 
+    negative priorities. The positive and negative priorty queues are actually heaps.
+
 
     The charm++ messages are only scheduled after the converse message queues
     have been emptied. After that, a message is pulled from the Charm++ queue
     through a call to CqsDequeue().
 
-    The Charm++ message queue is really three queues, one for positive 
-    priorities, one for zero priorities, and one for negative priorities.
-    The positive and negative priorty queues are actually heaps.
-
 
     @addtogroup CharmScheduler
     @{
@@ -667,6 +666,13 @@ void CqsEnqueue(Queue q, void *data)
 /** Retrieve the highest priority message (one with most negative priority) */
 void CqsDequeue(Queue q, void **resp)
 {
+#if 0
+  /* Added by Isaac for testing purposes: */
+  if(CmiMemoryUsage() > 1000*1024*1024 ){
+    CqsIncreasePriorityForEntryMethod(q, 153);
+  }
+#endif
+
   if (q->length==0) 
     { *resp = 0; return; }
   if (q->negprioq.heapnext>1)
@@ -697,6 +703,12 @@ Queue q;
 /*   return CqsGetPriority(q); */
 /* } */
 
+
+/** Produce an array containing all the entries in a deq
+    @return a newly allocated array filled with copies of the (void*) elements in the deq. 
+    @param [in] q a deq
+    @param [out] num the number of pointers in the returned array
+*/
 void** CqsEnumerateDeq(deq q, int *num){
   void **head, **tail;
   void **result;
@@ -728,6 +740,11 @@ void** CqsEnumerateDeq(deq q, int *num){
   return(result);
 }
 
+/** Produce an array containing all the entries in a prioq
+    @return a newly allocated array filled with copies of the (void*) elements in the prioq. 
+    @param [in] q a deq
+    @param [out] num the number of pointers in the returned array
+*/
 void** CqsEnumeratePrioq(prioq q, int *num){
   void **head, **tail;
   void **result;
@@ -747,7 +764,7 @@ void** CqsEnumeratePrioq(prioq q, int *num){
     }
   }
 
-  result = (void **)CmiAlloc(count * sizeof(void *));
+  result = (void **)CmiAlloc((count) * sizeof(void *));
   *num = count;
   
   j = 0;
@@ -767,6 +784,11 @@ void** CqsEnumeratePrioq(prioq q, int *num){
   return result;
 }
 
+/** Produce an array containing all the entries in a Queue
+    @return a newly allocated array filled with copies of the (void*) elements in the Queue. 
+    @param [in] q a Queue
+    @param [out] resp an array of pointer entries found in the Queue
+*/
 void CqsEnumerateQueue(Queue q, void ***resp){
   void **result;
   int num;
@@ -798,4 +820,77 @@ void CqsEnumerateQueue(Queue q, void ***resp){
 }
 
 
+
+
+/** Remove first occurence of a specified entry from the deq  by setting the entry to NULL.
+    The size of the deq will not change, it will now just contain an entry for a NULL pointer.
+
+    @return number of entries that were replaced with NULL
+*/
+int CqsRemoveSpecificDeq(deq q, const void *msgPtr){
+  void **head, **tail;
+
+  head = q->head;
+  tail = q->tail;
+
+  while(head != tail){
+    if(*head == msgPtr){
+      //    CmiPrintf("Replacing %p in deq with NULL\n", msgPtr);
+      //     *head = NULL;
+      return 1;
+    }
+    head++;
+    if(head == q->end)
+      head = q->bgn;
+  }
+  return 0;
+}
+
+
+
+/** Remove first occurence of a specified entry from the prioq by setting the entry to NULL.
+    The size of the prioq will not change, it will now just contain an entry for a NULL pointer.
+
+    @return number of entries that were replaced with NULL
+*/
+int CqsRemoveSpecificPrioq(prioq q, const void *msgPtr){
+  void **head, **tail;
+  void **result;
+  int i,j;
+  prioqelt pe;
+
+  for(i = 1; i < q->heapnext; i++){
+    pe = (q->heap)[i];
+    head = pe->data.head;
+    tail = pe->data.tail;
+    while(head != tail){
+      if(*head == msgPtr){
+       //      CmiPrintf("Replacing %p in prioq with NULL\n", msgPtr);
+       *head = NULL;
+       return 1;
+      }     
+      head++;
+      if(head == (pe->data).end)
+       head = (pe->data).bgn;
+    }
+  } 
+  return 0;
+}
+
+
+
+/** Remove an occurence of a specified entry from the Queue by setting its entry to NULL. 
+    The size of the Queue will not change, it will now just contain an entry for a NULL pointer.
+*/
+void CqsRemoveSpecific(Queue q, const void *msgPtr){
+  if( CqsRemoveSpecificPrioq(&(q->negprioq), msgPtr) == 0 )
+    if( CqsRemoveSpecificDeq(&(q->zeroprio), msgPtr) == 0 )  
+      if(CqsRemoveSpecificPrioq(&(q->posprioq), msgPtr) == 0){
+       CmiPrintf("Didn't remove the specified entry because it was not found\n");
+      }  
+}
+
+
+
+
 /** @} */
index b9793283ae98f869a1edf0c7495c55cc81474c64..1fe5fb1852e2ed1391a6323b5bd60fe0d3b36b55 100644 (file)
@@ -36,7 +36,7 @@ typedef struct prio_struct
 }
 *prio;
 
-/** An untyped double ended queue stored in a circular buffer, with internal space for 4 entries */
+/** A double ended queue of void* pointers stored in a circular buffer, with internal space for 4 entries */
 typedef struct deq_struct
 {
   /* Note: if head==tail, circ is empty */
@@ -49,7 +49,7 @@ typedef struct deq_struct
 *deq;
 
 #ifndef FASTQ
-/** An element in a priority queue, which contains a deque and some other stuff. */
+/** An bucket in a priority queue which contains a deque(storing the void* pointers) and references to other buckets in the hash table. */
 typedef struct prioqelt_struct
 {
   struct deq_struct data;
@@ -79,10 +79,10 @@ typedef struct prioqelt_struct
 /*#endif */
 
 /*#ifndef FASTQ*/
-/** A priority queue, implemented as a heap of prioqelts */
+/** A priority queue, implemented as a heap of prioqelt_struct buckets (each bucket represents a single priority value and contains a deque of void* pointers) */
 typedef struct prioq_struct
 {
-  int heapsize;  /**< An array of prioqelt's */
+  int heapsize; 
   int heapnext;
   prioqelt *heap; /**< An array of prioqelt's */
   prioqelt *hashtab;
@@ -103,7 +103,9 @@ typedef struct prioq1_struct
 */
 
 /*#ifndef FASTQ*/
-/** A set of 3 queues: a positive priority prioq_struct, a negative priority prioq_struct, and a zero priority deq_struct */
+/** A set of 3 queues: a positive priority prioq_struct, a negative priority prioq_struct, and a zero priority deq_struct.
+    If the user modifies the queue, NULL entries may be present, and hence NULL values will be returned by CqsDequeue().
+*/
 typedef struct Queue_struct
 {
   unsigned int length;
@@ -143,6 +145,14 @@ int CqsEmpty(Queue);
 int CqsPrioGT(prio, prio);
 prio CqsGetPriority(Queue);
 
+deq CqsPrioqGetDeq(prioq pq, unsigned int priobits, unsigned int *priodata);
+void *CqsPrioqDequeue(prioq pq);
+void CqsDeqEnqueueFifo(deq d, void *data);
+
+void* CqsIncreasePriorityForEntryMethod(Queue q, const int entrymethod);
+void CqsRemoveSpecific(Queue, const void *msgPtr);
+
+
 #ifdef __cplusplus
 };
 #endif
index 87b9ac9fee6fb4aa947c18022d24aca2910475d6..8d405762ba0dd0b14ac81f76b8353652a10fdddd 100644 (file)
@@ -358,7 +358,7 @@ LIBCONV_CORE= convcore.o conv-conds.o queueing.o msgmgr.o \
        traceCore.o traceCoreCommon.o tracec.o \
        converseProjections.o machineProjections.o \
        quiescence.o isomalloc.o conv-counter.o \
-       global-nop.o cmipool.o cpuaffinity.o cputopology.o
+       global-nop.o cmipool.o cpuaffinity.o cputopology.o modifyScheduler.o
 
 #############################################
 #Comlib Core objects that go into libck.a