Code cleanup. Shorter header for streaming and enable array caching again.
[charm.git] / src / ck-com / ComlibManager.C
index 0d734c663b89878052f5503a8209863b388c33f0..801c17b7a07fa06e5ef9b7789b36524e7f9039bd 100644 (file)
@@ -116,7 +116,7 @@ void ComlibManager::init(){
     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
     
     receivedTable = 0;
-    flushTable = 0;
+    setupComplete = 0;
 
     barrierReached = 0;
     barrier2Reached = 0;
@@ -250,7 +250,6 @@ void ComlibManager::endIteration(){
     
     (* strategyTable)[curStratID].elementCount++;
     int count = 0;
-    flushTable = 1;
     
     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
         
@@ -277,13 +276,13 @@ void ComlibManager::receiveTable(StrategyWrapper &sw,
     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
                  clibIteration);
 
-    clibIteration ++;
     receivedTable = 1;
 
     delete CkpvAccess(locationTable);
     CkpvAccess(locationTable) =  NULL;
 
-    CkpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
+    CkpvAccess(locationTable) = new 
+        CkHashtableT<ClibGlobalArrayIndex, int>;
 
     CkHashtableIterator *ht_iterator = htable.iterator();
     ht_iterator->seekStart();
@@ -333,7 +332,7 @@ void ComlibManager::receiveTable(StrategyWrapper &sw,
             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
            != NULL) {
             old_strategy->finalizeProcessing();
-
+            
             //Unregister from array listener if array strategy
             if(old_strategy->getType() == ARRAY_STRATEGY) {
                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
@@ -359,7 +358,7 @@ void ComlibManager::receiveTable(StrategyWrapper &sw,
            cur_strategy->isBracketed()){ 
 
             ComlibPrintf("Inserting Array Listener\n");
-
+            
             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
             as.getSourceArray(st_aid, st_elem, st_nelements);
             
@@ -402,15 +401,19 @@ void ComlibManager::receiveTable(StrategyWrapper &sw,
 
 void ComlibManager::resumeFromBarrier2(){
 
+    setupComplete = 1;
+
     barrier2Reached = 1;
     barrierReached = 0;
 
+    clibIteration ++;
+
     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
 
-    //    if(flushTable) {
-    for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
+    for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
+         count ++) {
         if (!(* strategyTable)[count].tmplist.isEmpty()) {
-            CharmMessageHolder *cptr;
+            
             while (!(* strategyTable)[count].tmplist.isEmpty()) {
                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
                     (* strategyTable)[count].tmplist.deq();
@@ -418,13 +421,14 @@ void ComlibManager::resumeFromBarrier2(){
                 if((*strategyTable)[count].strategy->getType() == 
                    ARRAY_STRATEGY) {
                     if(cmsg->dest_proc >= 0) {
-                        envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
-                        cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
-                                                       env->getsetArrayIndex());
+                        envelope *env  = UsrToEnv(cmsg->getCharmMessage());
+                        cmsg->dest_proc = getLastKnown
+                            (env->getsetArrayMgr(),
+                             env->getsetArrayIndex());
                     }
                     //else multicast
                 }                                
-
+                
                 if(cmsg->dest_proc == CkMyPe()) {
                     CmiSyncSendAndFree(CkMyPe(), cmsg->size,
                                        (char *)
@@ -442,7 +446,7 @@ void ComlibManager::resumeFromBarrier2(){
             (* strategyTable)[count].strategy->doneInserting();
         }
     }
-    //}    
+    
     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
 }
 
@@ -488,7 +492,7 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
     //With migration some array messages may be directly sent Also no
     //message processing should happen before the comlib barriers have
     //gone through
-    if(dest_proc == CkMyPe() && receivedTable){                
+    if(dest_proc == CkMyPe() && setupComplete){                
         CkArray *amgr = (CkArray *)_localBranch(a);
         amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
         
@@ -502,14 +506,12 @@ void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
     CharmMessageHolder *cmsg = new 
         CharmMessageHolder((char *)msg, dest_proc);
     
-    ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
+    ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, setupComplete);
     
-    if (receivedTable)        
+    if (setupComplete)        
         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
-    else {
-        flushTable = 1;
+    else 
         (* strategyTable)[curStratID].tmplist.enq(cmsg);
-    }
     
     //CmiPrintf("After Insert\n");
 }
@@ -532,14 +534,14 @@ void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, Ck
                  UsrToEnv(msg)->getTotalsize());
 
     register envelope * env = UsrToEnv(msg);
-    if(dest_proc == CkMyPe() && receivedTable){
+    if(dest_proc == CkMyPe() && setupComplete){
         _SET_USED(env, 0);
         CkSendMsgBranch(ep, msg, dest_proc, gid);
         return;
     }
     
     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
-    CpvAccess(_qd)->create(1);
+    CkpvAccess(_qd)->create(1);
 
     env->setMsgtype(ForBocMsg);
     env->setEpIdx(ep);
@@ -553,10 +555,9 @@ void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, Ck
     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
     //get rid of the new.
     
-    if(receivedTable)
+    if(setupComplete)
         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
     else {
-        flushTable = 1;
         (* strategyTable)[curStratID].tmplist.enq(cmsg);
     }
 }
@@ -647,7 +648,7 @@ void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g
     
     register envelope * env = UsrToEnv(m);
     
-    CpvAccess(_qd)->create(1);
+    CkpvAccess(_qd)->create(1);
 
     env->setMsgtype(ForBocMsg);
     env->setEpIdx(ep);
@@ -678,10 +679,9 @@ void ComlibManager::multicast(CharmMessageHolder *cmsg) {
 
     //Will be used to detect multicast message for learning
     
-    if (receivedTable)
+    if (setupComplete)
        (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
     else {
-        flushTable = 1;
        ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
         (* strategyTable)[curStratID].tmplist.enq(cmsg);
     }
@@ -799,6 +799,7 @@ void ComlibManager::AtSync() {
 
     barrier2Reached = 0;
     receivedTable = 0;
+    setupComplete = 0;
     barrierReached = 0;
     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
 
@@ -807,8 +808,10 @@ void ComlibManager::AtSync() {
     CkVec<ClibGlobalArrayIndex> gidx_vec;
 
     CkVec<CkArrayID> tmp_vec;
-    for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
-        if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
+    for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
+        count ++) {
+        if((* strategyTable)[count].strategy->getType() 
+           == ARRAY_STRATEGY) {
             CharmStrategy *cstrat = (CharmStrategy*)
                 ((* strategyTable)[count].strategy);