Changes for out-of-core emulation in BigSim. There is one change to be noticed that...
authorChao Mei <chaomei2@illinois.edu>
Tue, 30 Dec 2008 16:41:21 +0000 (16:41 +0000)
committerChao Mei <chaomei2@illinois.edu>
Tue, 30 Dec 2008 16:41:21 +0000 (16:41 +0000)
src/libs/ck-libs/ampi/ampi.C
src/libs/ck-libs/ampi/ampi.ci
src/libs/ck-libs/ampi/ampiimpl.h

index 00b20dd523d2001397b5467b10f5aad4ac37e201..0ca54532c29de0c5a5f58223e400a994040e434e 100644 (file)
 #define AMPI_PRINT_IDLE 0
 
 /* change this define to "x" to trace all send/recv's */
-#define MSG_ORDER_DEBUG(x) // x /* empty */
+#define MSG_ORDER_DEBUG(x)  //x /* empty */
 /* change this define to "x" to trace user calls */
 #define USER_CALL_DEBUG(x) // ckout<<"vp "<<TCHARM_Element()<<": "<<x<<endl; 
-#define STARTUP_DEBUG(x)  // ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
+#define STARTUP_DEBUG(x)  //ckout<<"ampi[pe "<<CkMyPe()<<"] "<< x <<endl; 
+#define FUNCCALL_DEBUG(x) //x /* empty */
 
 static CkDDT *getDDT(void) {
   return getAmpiParent()->myDDT;
@@ -577,14 +578,15 @@ Called from MPI_Init, a collective initialization call:
 */
 static ampi *ampiInit(char **argv)
 {
+  FUNCCALL_DEBUG(CkPrintf("Calling from proc %d for tcharm element %d\n", CmiMyPe(), TCHARM_Element());)
   if (CtvAccess(ampiInitDone)) return NULL; /* Already called ampiInit */
   STARTUP_DEBUG("ampiInit> begin")
-
+  
   MPI_Comm new_world;
   int _nchunks;
   CkArrayOptions opts;
   CProxy_ampiParent parent;
-  if (TCHARM_Element()==0)
+  if (TCHARM_Element()==0) //the rank of a tcharm object
   { /* I'm responsible for building the arrays: */
        STARTUP_DEBUG("ampiInit> creating arrays")
 
@@ -604,6 +606,9 @@ static ampi *ampiInit(char **argv)
        STARTUP_DEBUG("ampiInit> array size "<<_nchunks);
   }
   int *barrier = (int *)TCharm::get()->semaGet(AMPI_BARRIER_SEMAID);
+
+  FUNCCALL_DEBUG(CkPrintf("After BARRIER: sema size %d from tcharm's ele %d\n", TCharm::get()->sema.size(), TCHARM_Element());)
+
   if (TCHARM_Element()==0)
   {
        //Make a new ampi array
@@ -634,6 +639,7 @@ static ampi *ampiInit(char **argv)
        //Broadcast info. to the mpi_worlds array
        // FIXME: remove race condition from MPI_COMM_UNIVERSE broadcast
        ampiCommStruct newComm(new_world,arr,_nchunks);
+       //CkPrintf("In ampiInit: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
        if (ampiWorldsGroup.ckGetGroupID().isZero())
                ampiWorldsGroup=CProxy_ampiWorlds::ckNew(newComm);
        else
@@ -667,6 +673,27 @@ static ampi *ampiInit(char **argv)
 
   init_operations();     // initialize fortran reduction operation table
 
+  getAmpiParent()->ampiInitCallDone = 0;
+
+  CProxy_ampi cbproxy = ptr->getProxy();
+  CkCallback cb(CkIndex_ampi::allInitDone(NULL), cbproxy[0]);
+  ptr->contribute(0, NULL, CkReduction::sum_int, cb);
+
+  ampiParent *thisParent = getAmpiParent(); 
+  while(thisParent->ampiInitCallDone!=1){
+    //CkPrintf("In checking ampiInitCallDone(%d) loop at parent %d!\n", thisParent->ampiInitCallDone, thisParent->thisIndex);
+    thisParent->getTCharmThread()->stop();
+    /* 
+     * thisParent needs to be updated in case of the parent is being pupped.
+     * In such case, thisParent got changed
+     */
+    thisParent = getAmpiParent();
+  }
+
+#ifdef CMK_BLUEGENE_CHARM
+    BgSetStartOutOfCore();
+#endif
+
   return ptr;
 }
 
@@ -675,6 +702,7 @@ class ampiWorlds : public CBase_ampiWorlds {
 public:
     ampiWorlds(const ampiCommStruct &nextWorld) {
         ampiWorldsGroup=thisgroup;
+       //CkPrintf("In constructor: Current iso block: %p\n", CmiIsomallocBlockListCurrent());
         add(nextWorld);
     }
     ampiWorlds(CkMigrateMessage *m): CBase_ampiWorlds(m) {}
@@ -715,21 +743,36 @@ ampiParent::ampiParent(CkMigrateMessage *msg):CBase_ampiParent(msg) {
 }
 
 void ampiParent::pup(PUP::er &p) {
-
   ArrayElement1D::pup(p);
-//     printf("[%d] Ampiparent being pupped \n",thisIndex);
   p|threads;
-
   p|worldStruct;
   myDDT->pup(p);
   p|splitComm;
   p|groupComm;
   p|groups;
+
+//BIGSIM_OOC DEBUGGING
+//if(!p.isUnpacking()){
+//    CmiPrintf("ampiParent[%d] packing ampiRequestList: \n", thisIndex);
+//    ampiReqs.print();
+//}
+
   p|ampiReqs;
+
+//BIGSIM_OOC DEBUGGING
+//if(p.isUnpacking()){
+//    CmiPrintf("ampiParent[%d] unpacking ampiRequestList: \n", thisIndex);
+//    ampiReqs.print();
+//}
+
   p|RProxyCnt;
   p|tmpRProxy;
   p|winStructList;
   p|infos;
+
+  p|worldNo;  
+
+  p|ampiInitCallDone;
 }
 void ampiParent::prepareCtv(void) {
   thread=threads[thisIndex].ckLocal();
@@ -785,6 +828,16 @@ void ampiParent::ckJustMigrated(void) {
   prepareCtv();
 }
 
+void ampiParent::ckJustRestored(void) {
+  FUNCCALL_DEBUG(CkPrintf("Call just restored from ampiParent[%d] with ampiInitCallDone %d\n", thisIndex, ampiInitCallDone);)
+  ArrayElement1D::ckJustRestored();
+  prepareCtv();
+  
+  //BIGSIM_OOC DEBUGGING
+  //CkPrintf("In ampiParent[%d]:   ",thisIndex);
+  //CthPrintThdMagic(thread->getTid()); 
+}
+
 ampiParent::~ampiParent() {
   STARTUP_DEBUG("ampiParent> destructor called");
   finalize();
@@ -835,6 +888,43 @@ TCharm *ampiParent::registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration)
   return thread;
 }
 
+//BIGSIM_OOC DEBUGGING
+//Move the comm2ampi from inline to normal function for the sake of debugging
+/*ampi *ampiParent::comm2ampi(MPI_Comm comm){
+      //BIGSIM_OOC DEBUGGING
+      //CmiPrintf("%d, in ampiParent::comm2ampi, comm=%d\n", thisIndex, comm);
+      if (comm==MPI_COMM_WORLD) return worldPtr;
+      if (comm==MPI_COMM_SELF) return worldPtr;
+      if (comm==worldNo) return worldPtr;
+      if (isSplit(comm)) {
+         const ampiCommStruct &st=getSplit(comm);
+         return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (isGroup(comm)) {
+         const ampiCommStruct &st=getGroup(comm);
+         return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (isCart(comm)) {
+        const ampiCommStruct &st = getCart(comm);
+        return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (isGraph(comm)) {
+        const ampiCommStruct &st = getGraph(comm);
+        return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (isInter(comm)) {
+         const ampiCommStruct &st=getInter(comm);
+         return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (isIntra(comm)) {
+         const ampiCommStruct &st=getIntra(comm);
+         return st.getProxy()[thisIndex].ckLocal();
+      }
+      if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
+      CkAbort("Invalid communicator used!");
+      return NULL;
+}*/
+
 // reduction client data - preparation for checkpointing
 class ckptClientStruct {
 public:
@@ -1038,6 +1128,17 @@ void ampi::ckJustMigrated(void)
        ArrayElement1D::ckJustMigrated();
 }
 
+void ampi::ckJustRestored(void)
+{
+       FUNCCALL_DEBUG(CkPrintf("Call just restored from ampi[%d]\n", thisIndex);)
+       findParent(true);
+       ArrayElement1D::ckJustRestored();
+       
+       //BIGSIM_OOC DEBUGGING
+       //CkPrintf("In ampi[%d]:   ", thisIndex);
+       //CthPrintThdMagic(thread->getTid()); 
+}
+
 void ampi::findParent(bool forMigration) {
         STARTUP_DEBUG("ampi> finding my parent")
        parent=parentProxy[thisIndex].ckLocal();
@@ -1047,12 +1148,50 @@ void ampi::findParent(bool forMigration) {
 //     printf("[%d] ampi index %d TCharm thread pointer %p \n",CkMyPe(),thisIndex,thread);
 }
 
+//The following method should be called on the first element of the
+//ampi array
+void ampi::allInitDone(CkReductionMsg *m){
+    FUNCCALL_DEBUG(CkPrintf("All mpi_init have been called!\n");)
+    thisProxy.setInitDoneFlag();
+    delete m;
+}
+
+void ampi::setInitDoneFlag(){
+    //CkPrintf("ampi[%d]::setInitDone called!\n", thisIndex);
+    parent->ampiInitCallDone=1;
+    parent->getTCharmThread()->start();
+}
+
 static void cmm_pup_ampi_message(pup_er p,void **msg) {
        CkPupMessage(*(PUP::er *)p,msg,1);
        if (pup_isDeleting(p)) delete (AmpiMsg *)*msg;
 //     printf("[%d] pupping ampi message %p \n",CkMyPe(),*msg);
 }
 
+static void cmm_pup_posted_ireq(pup_er p,void **msg) {
+
+       pup_int(p, (int *)msg);
+
+/*     if(pup_isUnpacking(p)){
+           //*msg = new IReq;
+           //when unpacking, nothing is needed to do since *msg is an index
+           //(of type integer) to the ampiParent::ampiReqs (the AmpiRequestList)
+       }
+       if(!pup_isUnpacking(p)){
+           AmpiRequestList *reqL = getReqs();
+           int retIdx = reqL->findRequestIndex((IReq *)*msg);
+           if(retIdx==-1){
+               CmiAbort("An AmpiRequest instance should be found for an instance in posted_ireq!\n");
+           }
+           pup_int(p, retIdx)
+       }
+*/
+//     ((IReq *)*msg)->pup(*(PUP::er *)p);
+
+//     if (pup_isDeleting(p)) delete (IReq *)*msg;
+//     printf("[%d] pupping postd irequests %p \n",CkMyPe(),*msg);
+}
+
 void ampi::pup(PUP::er &p)
 {
   if(!p.isUserlevel())
@@ -1078,8 +1217,17 @@ void ampi::pup(PUP::er &p)
   }
 
   msgs=CmmPup((pup_er)&p,msgs,cmm_pup_ampi_message);
-  posted_ireqs = CmmNew();             // FIXME 
-//     printf("[%d] ampi index %d msgs table pointer %p\n",CkMyPe(),thisIndex,msgs);
+
+  //BIGSIM_OOC DEBUGGING
+  //if(!p.isUnpacking()){
+    //CkPrintf("ampi[%d]::packing: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
+  //}
+
+  posted_ireqs = CmmPup((pup_er)&p, posted_ireqs, cmm_pup_posted_ireq);
+
+  //if(p.isUnpacking()){
+    //CkPrintf("ampi[%d]::unpacking: posted_ireqs: %p with %d\n", thisIndex, posted_ireqs, CmmEntries(posted_ireqs));
+  //}
 
   p|seqEntries;
   p|oorder;
@@ -1087,18 +1235,19 @@ void ampi::pup(PUP::er &p)
 
 ampi::~ampi()
 {
-  if (CkInRestarting()) {
+  if (CkInRestarting() || BgOutOfCoreFlag==1) {
     // in restarting, we need to flush messages
-  int tags[3], sts[3];
-  tags[0] = tags[1] = tags[2] = CmmWildCard;
-  AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
-  while (msg) {
-    delete msg;
-    msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
-  }
+    int tags[3], sts[3];
+    tags[0] = tags[1] = tags[2] = CmmWildCard;
+    AmpiMsg *msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
+    while (msg) {
+      delete msg;
+      msg = (AmpiMsg *) CmmGet(msgs, 3, tags, sts);
+    }
   }
+
   CmmFree(msgs);
-  CmmFree(posted_ireqs);         // FIXME
+  CmmFreeAll(posted_ireqs);
 }
 
 //------------------------ Communicator Splitting ---------------------
@@ -1488,7 +1637,7 @@ void
 ampi::generic(AmpiMsg* msg)
 {
 MSG_ORDER_DEBUG(
-  CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
+       CkPrintf("AMPI vp %d arrival: tag=%d, src=%d, comm=%d  (from %d, seq %d) resumeOnRecv %d\n",
        thisIndex,msg->tag,msg->srcRank,msg->comm, msg->srcIdx, msg->seq,resumeOnRecv);
 )
 
@@ -1509,10 +1658,13 @@ MSG_ORDER_DEBUG(
   }
   
   if(resumeOnRecv){
+    //CkPrintf("Calling TCharm::resume at ampi::generic!\n");
     thread->resume();
   }
 }
 
+inline static AmpiRequestList *getReqs(void); 
+
 void
 ampi::inorder(AmpiMsg* msg)
 {
@@ -1525,7 +1677,13 @@ MSG_ORDER_DEBUG(
   tags[0] = msg->tag; tags[1] = msg->srcRank; tags[2] = msg->comm;
   IReq *ireq = NULL;
   if (CpvAccess(CmiPICMethod) != 2) {
-    IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
+    //IReq *ireq = (IReq *)CmmGet(posted_ireqs, 3, tags, sts);
+    //in case ampi has not initialized and posted_ireqs are only inserted 
+    //at AMPI_Irecv (MPI_Irecv)
+    AmpiRequestList *reqL = &(parent->ampiReqs);
+    int ireqIdx = (int)((long)CmmGet(posted_ireqs, 3, tags, sts));
+    if(reqL->size()>0)
+       ireq = (IReq *)(*reqL)[ireqIdx];
     if (ireq) {        // receive posted
       ireq->receive(this, msg);
       ireq->tag = sts[0];
@@ -1854,9 +2012,27 @@ AmpiMsg *AmpiSeqQ::getOutOfOrder(int srcIdx)
   return 0;
 }
 
+//BIGSIM_OOC DEBUGGING: Output for AmpiRequest and its children classes
+void AmpiRequest::print(){
+            CmiPrintf("In AmpiRequest: buf=%p, count=%d, type=%d, src=%d, tag=%d, comm=%d, isvalid=%d\n", buf, count, type, src, tag, comm, isvalid);
+}
+
+void PersReq::print(){
+    AmpiRequest::print();
+    CmiPrintf("In PersReq: sndrcv=%d\n", sndrcv);
+}
+
+void IReq::print(){
+    AmpiRequest::print();
+    CmiPrintf("In IReq: this=%p, status=%d, length=%d\n", this, statusIreq, length);
+}
+
+void ATAReq::print(){ //not complete for myreqs
+    AmpiRequest::print();
+    CmiPrintf("In ATAReq: elmcount=%d, idx=%d\n", elmcount, idx);
+} 
 
 void AmpiRequestList::pup(PUP::er &p) { 
-       CmiMemoryCheck();
        if(!CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC)){
                return;
        }
@@ -1900,7 +2076,6 @@ void AmpiRequestList::pup(PUP::er &p) {
        if(p.isDeleting()){
                freeBlock();
        }
-       CmiMemoryCheck();
 }
 
 //------------------ External Interface -----------------
@@ -2028,6 +2203,7 @@ CDECL void AMPI_Setmigratable(MPI_Comm comm, int mig){
 
 CDECL int AMPI_Init(int *p_argc, char*** p_argv)
 {
+    //AMPIAPI("AMPI_Init");
   if (nodeinit_has_been_called) {
     AMPIAPI("AMPI_Init");
     char **argv;
@@ -2040,6 +2216,7 @@ CDECL int AMPI_Init(int *p_argc, char*** p_argv)
   { /* Charm hasn't been started yet! */
     CkAbort("AMPI_Init> Charm is not initialized!");
   }
+
   return 0;
 }
 
@@ -2273,6 +2450,9 @@ int AMPI_Barrier(MPI_Comm comm)
 #endif
   //HACK: Use collective operation as a barrier.
   AMPI_Allreduce(NULL,NULL,0,MPI_INT,MPI_SUM,comm);
+
+  //BIGSIM_OOC DEBUGGING
+  //CkPrintf("%d: in AMPI_Barrier, after AMPI_Allreduce\n", getAmpiParent()->thisIndex);
 #if CMK_BLUEGENE_CHARM
   //TRACE_BG_AMPI_BARRIER_END(barrierLog);
   _TRACE_BG_SET_INFO(NULL, "AMPI_Barrier_END",  &barrierLog, 1);
@@ -2746,13 +2926,41 @@ int PersReq::wait(MPI_Status *sts){
 int IReq::wait(MPI_Status *sts){
   if (CpvAccess(CmiPICMethod) != 2) 
   {
+
+       //Copy "this" to a local variable in the case that "this" pointer
+       //is updated during the out-of-core emulation.
+
           // optimization for Irecv
           // generic() writes directly to the buffer, so the only thing we
           // do here is to wait
        ampi *ptr = getAmpiInstance(comm);
-       while (status == false) {
+
+       //BIGSIM_OOC DEBUGGING
+       //int ooccnt=0;
+       //int ampiIndex = ptr->thisIndex;
+       //CmiPrintf("%d: IReq's status=%d\n", ampiIndex, statusIreq);
+       
+       while (statusIreq == false) {
+               //BIGSIM_OOC DEBUGGING
+               //CmiPrintf("Before blocking: %dth time: %d: in Ireq::wait\n", ++ooccnt, ptr->thisIndex);
+               //print();
+
                ptr->resumeOnRecv=true;
                ptr->block();
+                       
+               //BIGSIM_OOC DEBUGGING
+               //CmiPrintf("After blocking: %dth time: %d: in Ireq::wait\n", ooccnt, ampiIndex);
+               //CmiPrintf("IReq's this pointer: %p\n", this);
+               //print();
+
+
+           #if CMK_BLUEGENE_CHARM
+               //Because of the out-of-core emulation, this pointer is changed after in-out
+               //memory operation. So we need to return from this function and do the while loop
+               //in the outer function call.   
+               if(BgInOutOfCoreMode)
+                   return -1;
+           #endif      
        }
        ptr->resumeOnRecv=false;
         if(sts) {
@@ -2814,7 +3022,16 @@ int AMPI_Wait(MPI_Request *request, MPI_Status *sts)
 #endif
   
   AMPI_DEBUG("MPI_Wait: request=%d, reqs.size=%d, &reqs=%d\n",*request,reqs->size(),reqs);
-  (*reqs)[*request]->wait(sts);
+  //(*reqs)[*request]->wait(sts);
+  int waitResult = -1;
+  do{
+    AmpiRequest *waitReq = (*reqs)[*request];
+    waitResult = waitReq->wait(sts);
+    if(BgInOutOfCoreMode){
+       reqs = getReqs();
+    }
+  }while(waitResult==-1);
+  
 
 #ifdef AMPIMSGLOG
   if(msgLogWrite && pptr->thisIndex == msgLogRank){
@@ -2841,7 +3058,7 @@ int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
   checkRequests(count,request);
   int i,j,oldPe;
   AmpiRequestList* reqs = getReqs();
-  CkVec<CkVec<int> > *reqvec = vecIndex(count,request);\
+  CkVec<CkVec<int> > *reqvec = vecIndex(count,request);
 
 #ifdef AMPIMSGLOG
   ampiParent* pptr = getAmpiParent();
@@ -2871,13 +3088,22 @@ int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
 #endif
   for(i=0;i<reqvec->size();i++){
     for(j=0;j<((*reqvec)[i]).size();j++){
+      //CkPrintf("in loop [%d, %d]\n", i, j);
       if(request[((*reqvec)[i])[j]] == MPI_REQUEST_NULL){
         stsempty(sts[((*reqvec)[i])[j]]);
         continue;
       }
       oldPe = CkMyPe();
-      AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
-      waitReq->wait(&sts[((*reqvec)[i])[j]]);
+
+      int waitResult = -1;
+      do{      
+       AmpiRequest *waitReq = ((*reqs)[request[((*reqvec)[i])[j]]]);
+       waitResult = waitReq->wait(&sts[((*reqvec)[i])[j]]);
+       if(BgInOutOfCoreMode){
+           reqs = getReqs();
+           reqvec = vecIndex(count, request);
+       }
+      }while(waitResult==-1);
 
 #ifdef AMPIMSGLOG
       if(msgLogWrite && pptr->thisIndex == msgLogRank){
@@ -2887,7 +3113,7 @@ int AMPI_Waitall(int count, MPI_Request request[], MPI_Status sts[])
        PUParray(*(pptr->toPUPer), (char *)(&sts[((*reqvec)[i])[j]]), sizeof(MPI_Status));
       }
 #endif
-
+    
 #if 1
       if(oldPe != CkMyPe()){
                        reqs = getReqs();
@@ -2988,7 +3214,7 @@ void PersReq::complete(MPI_Status *sts){
 }
 
 CmiBool IReq::test(MPI_Status *sts){
-        if (status == true) {           
+        if (statusIreq == true) {           
          if(sts)
             sts->MPI_LENGTH = length;           
          return true;
@@ -3012,9 +3238,13 @@ void IReq::complete(MPI_Status *sts){
 void IReq::receive(ampi *ptr, AmpiMsg *msg)
 {
     int sts = ptr->processMessage(msg, tag, src, buf, count, type);
-    status = (sts == 0);
+    statusIreq = (sts == 0);
     length = msg->length;
     delete msg;
+    
+    //BIGSIM_OOC DEBUGGING
+    //CmiPrintf("In IReq::receive, this=%p ", this);
+    //print();
 }
 
 CmiBool ATAReq::test(MPI_Status *sts){
@@ -3326,7 +3556,15 @@ int AMPI_Irecv(void *buf, int count, MPI_Datatype type, int src,
       // post receive
     int tags[3];
     tags[0] = tag; tags[1] = src; tags[2] = comm;
-    CmmPut(ptr->posted_ireqs, 3, tags, newreq);
+    //CmmPut(ptr->posted_ireqs, 3, tags, newreq);
+    //just insert the index of the newreq in the ampiParent::ampiReqs
+    //to posted_ireqs. Such change is due to the need for Out-of-core Emulation
+    //in BigSim. Before this change, posted_ireqs and ampiReqs both hold pointers to
+    //AmpiRequest instances. After going through the Pupping routines, both will have
+    //pointers to different AmpiRequest instances and no longer refer to the same AmpiRequest
+    //instance. Therefore, to keep both always accessing the same AmpiRequest instance,
+    //posted_ireqs stores the index (an integer) to ampiReqs.  
+    CmmPut(ptr->posted_ireqs, 3, tags, (void *)(*request));
   }
   
 #if AMPI_COUNTER
index 879fa52376e484f70826681dd713530c61bbdf6d..d66c82a7f952080d4c63bb62313dc2320419d2b4 100644 (file)
@@ -17,6 +17,8 @@ module ampi {
     entry [notrace] ampi(CkArrayID parent_,ampiCommStruct s);
     entry [notrace] ampi(CkArrayID parent_,ampiCommStruct s,ComlibInstanceHandle ciStreaming_,
                ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_);
+    entry void allInitDone(CkReductionMsg *m);
+    entry void setInitDoneFlag();
     entry void unblock(void);
     entry void generic(AmpiMsg *);
     entry void reduceResult(CkReductionMsg *m);
index 4b27c0d0877fe7e4218ea12e00b0ee3bfa537fbf..3a10956fcc2ab39d14d3ead4e0339b57ef11b91d 100644 (file)
@@ -600,6 +600,9 @@ public:
                p(comm);
                p(isvalid);
        }
+       
+       //added due to BIGSIM_OOC DEBUGGING
+       virtual void print();
 };
 
 class PersReq : public AmpiRequest {
@@ -623,18 +626,20 @@ public:
                AmpiRequest::pup(p);
                p(sndrcv);
        }
+       //added due to BIGSIM_OOC DEBUGGING
+       virtual void print();
 };
 
 class IReq : public AmpiRequest {
 public:
-       bool status;
+       bool statusIreq;
        int length;     // recv'ed length
        IReq(void *buf_, int count_, int type_, int src_, int tag_, MPI_Comm comm_)
        {
                buf=buf_;  count=count_;  type=type_;  src=src_;  tag=tag_; 
-               comm=comm_;  isvalid=true; status=false; length=0;
+               comm=comm_;  isvalid=true; statusIreq=false; length=0;
        }
-       IReq(): status(false){};
+       IReq(): statusIreq(false){};
        ~IReq(){ }
        CmiBool test(MPI_Status *sts);
        void complete(MPI_Status *sts);
@@ -643,8 +648,10 @@ public:
        void receive(ampi *ptr, AmpiMsg *msg); 
        virtual void pup(PUP::er &p){
                AmpiRequest::pup(p);
-               p|status;  p|length;
+               p|statusIreq;  p|length;
        }
+       //added due to BIGSIM_OOC DEBUGGING
+       virtual void print();
 };
 
 class ATAReq : public AmpiRequest {
@@ -701,6 +708,8 @@ public:
                        delete []myreqs;
                }
        }
+       //added due to BIGSIM_OOC DEBUGGING
+       virtual void print();
 };
 
 /// Special CkVec<AmpiRequest*> for AMPI. Most code copied from cklist.h
@@ -771,13 +780,31 @@ class AmpiRequestList : private CkSTLHelper<AmpiRequest *> {
       push_back(elt);
       return len-1;
     }
-    
+
     inline void checkRequest(MPI_Request idx){
       if(!(idx==-1 || (idx < this->len && (block[idx])->isValid())))
         CkAbort("Invalide MPI_Request\n");
     }
 
+    //find an AmpiRequest by its pointer value
+    //return -1 if not found!
+    int findRequestIndex(AmpiRequest *req){
+       for(int i=0; i<len; i++){
+           if(block[i]==req) return i;
+       }
+       return -1;
+    }
+
     void pup(PUP::er &p);
+    
+    //BIGSIM_OOC DEBUGGING
+    void print(){
+       for(int i=0; i<len; i++){
+           if(block[i]==NULL) continue;
+           CmiPrintf("AmpiRequestList Element %d [%p]: \n", i+1, block[i]);
+           block[i]->print();
+       }
+    }
 };
 
 //A simple memory buffer
@@ -1091,10 +1118,14 @@ class ampiParent : public CBase_ampiParent {
     int RProxyCnt;
     CProxy_ampi tmpRProxy;
 
+public:
+    int ampiInitCallDone;
+
 public:
     ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_);
     ampiParent(CkMigrateMessage *msg);
     void ckJustMigrated(void);
+    void ckJustRestored(void);
     ~ampiParent();
 
     ampi *lookupComm(MPI_Comm comm) {
@@ -1161,6 +1192,7 @@ public:
     void startCheckpoint(char* dname);
     void Checkpoint(int len, char* dname);
     void ResumeThread(void);
+    TCharm* getTCharmThread() {return thread;}
 
     inline const ampiCommStruct &comm2CommStruct(MPI_Comm comm) {
       if (comm==MPI_COMM_WORLD) return worldStruct;
@@ -1174,6 +1206,8 @@ public:
       if (isIntra(comm)) return getIntra(comm);
       return universeComm2CommStruct(comm);
     }
+    
+     //ampi *comm2ampi(MPI_Comm comm);
     inline ampi *comm2ampi(MPI_Comm comm) {
       if (comm==MPI_COMM_WORLD) return worldPtr;
       if (comm==MPI_COMM_SELF) return worldPtr;
@@ -1275,6 +1309,10 @@ public:
     CkDDT myDDTsto;
     CkDDT *myDDT;
     AmpiRequestList ampiReqs;
+
+    //added to make sure post_ireqs in ampi class share the same pointers
+    //with those in ampiReqs after pupping routines.
+    //AmpiRequestList oldAmpiReqs;
     
     int addWinStruct(WinStruct* win);
     WinStruct getWinStruct(MPI_Win win);
@@ -1351,9 +1389,14 @@ friend class IReq;
        ComlibInstanceHandle ciBcast_,ComlibInstanceHandle ciAllgather_,ComlibInstanceHandle ciAlltoall_);
     ampi(CkMigrateMessage *msg);
     void ckJustMigrated(void);
+    void ckJustRestored(void);
     ~ampi();
 
     virtual void pup(PUP::er &p);
+
+    void allInitDone(CkReductionMsg *m);
+    void setInitDoneFlag();
+  
     void block(void);
     void unblock(void);
     void yield(void);