Adding support for collectives and parallel recovery.
[charm.git] / src / ck-core / ckcausalmlog.C
index 381b79f706b7d4259396fe3d98becb9e99df45cb..4aed8567663589afb0399d05c93fb209849cd7ed 100644 (file)
@@ -133,7 +133,8 @@ CpvDeclare(char **, _storeDetsPtrs);
 /***** *****/
 
 /***** VARIABLES FOR PARALLEL RECOVERY *****/
-CpvDeclare(CkVec<LocationID *> *, _emigrantRecObjs);
+CpvDeclare(int, _numEmigrantRecObjs);
+CpvDeclare(int, _numImmigrantRecObjs);
 CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
 /***** *****/
 
@@ -333,10 +334,13 @@ void _messageLoggingInit(){
        CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
 
        // Cpv variables for parallel recovery
-       CpvInitialize(CkVec<LocationID *> *, _emigrantRecObjs);
-       CpvAccess(_emigrantRecObjs) = new CkVec<LocationID *>;
-       CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
-       CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
+       CpvInitialize(int, _numEmigrantRecObjs);
+    CpvAccess(_numEmigrantRecObjs) = 0;
+    CpvInitialize(int, _numImmigrantRecObjs);
+    CpvAccess(_numImmigrantRecObjs) = 0;
+
+    CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
+    CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
 
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
@@ -2903,20 +2907,17 @@ public:
                CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
                idx.print();
 
-               // inserting the emigrant object to the list of emigrant recovery objects
-               // every broadcast message will be sent to these objects
-               LocationID *location = new LocationID();
-               location->idx = idx;
-               location->gid = loc.getManager()->getGroupID();
-               location->PE = *targetPE;
-               CpvAccess(_emigrantRecObjs)->push_back(location);       
+               // incrementing number of emigrant objects
+               CpvAccess(_numEmigrantRecObjs)++;
                        
                //pack up this location and send it across
                PUP::sizer psizer;
                pupLocation(loc,psizer);
-               int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
+               int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
                char *msg = (char *)CmiAlloc(totalSize);
-               char *buf = &msg[CmiMsgHeaderSizeBytes];
+               DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
+               distributeMsg->PE = CkMyPe();
+               char *buf = &msg[sizeof(DistributeObjectMsg)];
                PUP::toMem pmem(buf);
                pmem.becomeDeleting();
                pupLocation(loc,pmem);
@@ -2955,7 +2956,9 @@ void distributeRestartedObjects(){
  */
 void _distributedLocationHandler(char *receivedMsg){
        printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
-       char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
+       DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
+       int sourcePE = distributeMsg->PE;
+       char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
        PUP::fromMem pmem(buf);
        CkGroupID gID;
        CkArrayIndexMax idx;
@@ -2974,6 +2977,7 @@ void _distributedLocationHandler(char *receivedMsg){
 
        // adding object to the list of immigrant recovery objects
        CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
+       CpvAccess(_numImmigrantRecObjs)++;
        
        CkVec<CkMigratable *> eltList;
        mgr->migratableList((CkLocRec_local *)rec,eltList);
@@ -2981,6 +2985,7 @@ void _distributedLocationHandler(char *receivedMsg){
                if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
                        CpvAccess(_currentObj) = eltList[i];
                        eltList[i]->mlogData->immigrantRecFlag = 1;
+                       eltList[i]->mlogData->immigrantSourcePE = sourcePE;
                        eltList[i]->ResumeFromSync();
                }
        }