Changes for creating a new type of entry method which could be rescheduled when memor...
authorChao Mei <chaomei2@illinois.edu>
Fri, 18 Sep 2009 22:40:04 +0000 (22:40 +0000)
committerChao Mei <chaomei2@illinois.edu>
Fri, 18 Sep 2009 22:40:04 +0000 (22:40 +0000)
src/ck-core/charm.h
src/ck-core/init.C
src/ck-core/register.C
src/ck-core/register.h
src/conv-core/converse.h
src/conv-core/modifyScheduler.C
src/conv-core/queueing.c
src/conv-core/queueing.h

index 8e63705f20ee802bb335c9da143b0d6e0bbc7127..dcb1ab5a71c085730a6d3c68b34538ebf5752874 100644 (file)
@@ -110,6 +110,8 @@ extern int CkRegisterMsg(const char *name, CkPackFnPtr pack,
 #define CK_EP_INTRINSIC     (1<<3) 
 #define CK_EP_TRACEDISABLE  (1<<4) 
 
+#define CK_EP_MEMCRITICAL (1<<5)
+
 /** A "call function" to invoke a method on an object. See EntryInfo */
 typedef void  (*CkCallFnPtr) (void *msg, void *obj);
 /** Register this entry point, with this call function and flags.
index d75ac3c43e31df059bd2f9be086cc8231de8ca57..0b70143e17b24245233d7b1a2b3aa7d770674f11 100644 (file)
@@ -916,6 +916,18 @@ void _initCharm(int unused_argc, char **argv)
          _TRACE_BEGIN_COMPUTATION();
        }
 
+#ifdef ADAPT_SCHED_MEM
+    if(CkMyRank()==0){
+       memCriticalEntries = new int[numMemCriticalEntries];
+       int memcnt=0;
+       for(int i=0; i<_entryTable.size(); i++){
+           if(_entryTable[i]->isMemCritical){
+               memCriticalEntries[memcnt++] = i;
+           }
+       }
+    }
+#endif
+
 #ifdef _FAULT_MLOG_
     _messageLoggingInit();
 #endif
index d826cf251ebbc6eab3470e7dfea1d0d2853ea029..1890857df7b0996c21dcb97d26b48668f690cb7f 100644 (file)
@@ -56,6 +56,13 @@ int CkRegisterEp(const char *name, CkCallFnPtr call, int msgIdx, int chareIdx,
   if (ck_ep_flags & CK_EP_NOKEEP) e->noKeep=CmiTrue;
   if (ck_ep_flags & CK_EP_INTRINSIC) e->inCharm=CmiTrue;
   if (ck_ep_flags & CK_EP_TRACEDISABLE) e->traceEnabled=CmiFalse;
+#if ADAPT_SCHED_MEM
+  if (ck_ep_flags & CK_EP_MEMCRITICAL){
+     e->isMemCritical=CmiTrue;
+     if (CkMyRank()==0)
+        numMemCriticalEntries++;
+  }
+#endif
   return _entryTable.add(e);
 }
 
index 638a3ffd787e8bb897ee145e6bf7061b7c9b555e..7ff83e931589f5a5ee85e7aa0698a49aa548f3da 100644 (file)
@@ -65,6 +65,10 @@ class EntryInfo {
     /// true if this EP is charm internal functions
     CmiBool inCharm;
     
+#ifdef ADAPT_SCHED_MEM
+   /// true if this EP is used to be rescheduled when adjusting memory usage
+   CmiBool isMemCritical;
+#endif
     /** 
       A "marshall unpack" function:
         1.) Pups method parameters out of the buffer passed in to it.
index 95375d9cac348fa41e6304023917f3b76f2df156..34265c0c04f81770441e6762b4b46f279ed5630e 100644 (file)
@@ -1745,6 +1745,11 @@ extern void setMemoryTypeMessage(void*); /* for memory debugging */
 extern int BgOutOfCoreFlag;
 extern int BgInOutOfCoreMode;
 
+#ifdef ADAPT_SCHED_MEM
+extern int numMemCriticalEntries;
+extern int *memCriticalEntries;
+#endif
+
 #if defined(__cplusplus)
 }
 #endif
index 3caeaaaffae37437e426a7f7b2fb0299877fd491..d3ba2b745ce7356d5260f803ab70829cd1f543c6 100644 (file)
@@ -45,7 +45,31 @@ void CqsIncreasePriorityForEntryMethod(Queue q, const int entrymethod){
     }
 }
  
+#ifdef ADAPT_SCHED_MEM
+/** Search Queue for messages associated with memory-critical entry methods */ 
+void CqsIncreasePriorityForMemCriticalEntries(Queue q){
+    void *removedMsgPtr;
+    int numRemoved;
+
+    numRemoved = CqsFindRemoveSpecificPrioq(&(q->negprioq), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
+    if(numRemoved == 0)
+       numRemoved = CqsFindRemoveSpecificDeq(&(q->zeroprio), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
+    if(numRemoved == 0)
+       numRemoved = CqsFindRemoveSpecificPrioq(&(q->posprioq), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
+    
+    if(numRemoved > 0){
+       CkAssert(numRemoved==1); // We need to reenqueue all removed messages, but we currently only handle one
+       int prio = -1000000; 
+       CqsEnqueueGeneral(q, removedMsgPtr, CQS_QUEUEING_IFIFO, 0, (unsigned int*)&prio);
 
+#ifndef CMK_OPTIMIZE 
+       char traceStr[64];
+       sprintf(traceStr, "Replacing %p in message queue with NULL", removedMsgPtr);
+       traceUserSuppliedNote(traceStr);
+#endif
+    }
+}
+#endif
 
 /** Find and remove the first 1 occurences of messages that matches a specified entry method index.
     The size of the deq will not change, it will just contain an entry for a NULL pointer.
index cedb4b82e7491bdc76f61e9af48c5dccf0eb3e45..9f9f17de6c6323e4c1769fea6f19d24ba9efb9db 100644 (file)
@@ -681,7 +681,8 @@ void CqsDequeue(Queue q, void **resp)
 #ifdef ADAPT_SCHED_MEM
     /* Added by Isaac for testing purposes: */
     if((q->length > 1) && (CmiMemoryUsage() > schedAdaptMemThresholdMB*1024*1024) ){
-       CqsIncreasePriorityForEntryMethod(q, 153);
+       /* CqsIncreasePriorityForEntryMethod(q, 153); */
+       CqsIncreasePriorityForMemCriticalEntries(q); 
     }
 #endif
     
@@ -903,7 +904,10 @@ void CqsRemoveSpecific(Queue q, const void *msgPtr){
 }
 
 
-
+#ifdef ADAPT_SCHED_MEM
+int numMemCriticalEntries=0;
+int *memCriticalEntries=NULL;
+#endif
 
 
 
index 6a058476ed0cf8dfe50dbc08aae539780fd2d44c..49803f33401a358a661f9e4edbcdb05b3c114f9e 100644 (file)
@@ -160,6 +160,9 @@ void CqsDeqEnqueueFifo(deq d, void *data);
 void CqsIncreasePriorityForEntryMethod(Queue q, const int entrymethod);
 void CqsRemoveSpecific(Queue, const void *msgPtr);
 
+#ifdef ADAPT_SCHED_MEM
+void CqsIncreasePriorityForMemCriticalEntries(Queue q);
+#endif
 
 #ifdef __cplusplus
 };