fixed a race condition when an insert element message arrives before CkArray is created
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 8 May 2012 14:24:34 +0000 (09:24 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 8 May 2012 14:24:34 +0000 (09:24 -0500)
src/ck-core/ck.C
src/ck-core/ckarray.C

index ac68cdf78bad9189ab3715703d67cd48d0032d03..6cdbf46b53f97866f8a5cb7659a54dcf5110f284 100644 (file)
@@ -1034,6 +1034,11 @@ static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope
        return obj;
 }
 
+IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
+{
+  return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
+}
+
 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
 {
 #if CMK_LBDB_ON
index e21b68eb219f82663bb6c507b56c1aaaaddaa039..969753bfb0aa5d977f6a4d55ba7c2197442c25c8 100644 (file)
@@ -646,12 +646,45 @@ CkArrayID CProxy_ArrayBase::ckCreateEmptyArray(void)
   return ckCreateArray((CkArrayMessage *)CkAllocSysMsg(),0,CkArrayOptions());
 }
 
+extern IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID);
+
+struct CkInsertIdxMsg {
+  char core[CmiReservedHeaderSize];
+  CkArrayIndex idx;
+  CkArrayMessage *m;
+  int ctor;
+  int onPe;
+  CkArrayID _aid;
+};
+
+static int ckinsertIdxHdl;
+
+void ckinsertIdxFunc(void *m)
+{
+  CkInsertIdxMsg *msg = (CkInsertIdxMsg *)m;
+  CProxy_ArrayBase   ca(msg->_aid);
+  ca.ckInsertIdx(msg->m, msg->ctor, msg->onPe, msg->idx);
+  CmiFree(msg);
+}
+
 void CProxy_ArrayBase::ckInsertIdx(CkArrayMessage *m,int ctor,int onPe,
        const CkArrayIndex &idx)
 {
   if (m==NULL) m=(CkArrayMessage *)CkAllocSysMsg();
   m->array_ep()=ctor;
-  ckLocalBranch()->prepareCtorMsg(m,onPe,idx);
+  CkArray *ca = ckLocalBranch();
+  if (ca == NULL) {
+      CkInsertIdxMsg *msg = (CkInsertIdxMsg *)CmiAlloc(sizeof(CkInsertIdxMsg));
+      msg->idx = idx;
+      msg->m = m;
+      msg->ctor = ctor;
+      msg->onPe = onPe;
+      msg->_aid = _aid;
+      CmiSetHandler(msg, ckinsertIdxHdl);
+      ca = (CkArray *)lookupGroupAndBufferIfNotThere(CkpvAccess(_coreState), (envelope*)msg,_aid);
+      if (ca == NULL) return;
+  }
+  ca->prepareCtorMsg(m,onPe,idx);
   if (ckIsDelegated()) {
        ckDelegatedTo()->ArrayCreate(ckDelegatedPtr(),ctor,m,idx,onPe,_aid);
        return;
@@ -706,6 +739,7 @@ void _ckArrayInit(void)
     // disable because broadcast listener may deliver broadcast message
   CkDisableTracing(CkIndex_CkLocMgr::immigrate(0));
   // by default anytime migration is allowed
+  ckinsertIdxHdl = CkRegisterHandler(ckinsertIdxFunc);
 }
 
 CkArray::CkArray(CkArrayOptions &opts,