/*
 * Decompiled with CFR 0.152.
 */
package parReg.execute;

import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.execute.InternalExecution;
import hydra.Log;
import hydra.RemoteTestModule;
import hydra.StopSchedulingTaskOnClientOrder;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import parReg.ParRegUtil;
import parReg.execute.ExecuteExceptionBB;
import parReg.execute.ExecutionAndColocationTest;
import parReg.execute.MemberResultsCollector;
import parReg.execute.PartialResultsExecutionFunction;
import rebalance.RebalanceBB;
import recovDelay.BucketState;
import util.PRObserver;
import util.TestException;

public class FunctionExecutionWithRebalancingTest
extends ExecutionAndColocationTest {
    public static final String EXTRA_VM_IDS = "Extra_Vm_Ids";
    public static final String NEXT_VM_TO_BE_UP = "Next_Vm_To_Be_Up";
    private static final AtomicInteger opId = new AtomicInteger(0);

    private static FunctionExecutionWithRebalancingTest getTestInstance() {
        return (FunctionExecutionWithRebalancingTest)testInstance;
    }

    public static synchronized void HydraTask_HA_dataStoreInitialize() {
        if (testInstance == null) {
            RebalanceBB.getBB().getSharedCounters().increment(RebalanceBB.numDataStores);
        }
        ExecutionAndColocationTest.HydraTask_HA_dataStoreInitialize();
    }

    public static void HydraTask_addCapacityAndReBalance() {
        boolean isNextTurn;
        if (testInstance == null) {
            testInstance = new FunctionExecutionWithRebalancingTest();
        }
        if (isNextTurn = ((FunctionExecutionWithRebalancingTest)testInstance).isNextTurn()) {
            Log.getLogWriter().info("Got true, going to add capacity");
            ((FunctionExecutionWithRebalancingTest)testInstance).addCapacityAndReBalance();
            FunctionExecutionWithRebalancingTest.HydraTask_createTheNextTurn();
            throw new StopSchedulingTaskOnClientOrder("Completed the rebalancing for the node");
        }
    }

    public static synchronized void HydraTask_updateBBWithExtraVmIds() {
        Integer myVmId = new Integer(RemoteTestModule.getMyVmid());
        HashSet extraVmIdsSet = RebalanceBB.getBB().getSharedMap().get(EXTRA_VM_IDS) == null ? new HashSet() : (HashSet)RebalanceBB.getBB().getSharedMap().get(EXTRA_VM_IDS);
        extraVmIdsSet.add(myVmId);
        RebalanceBB.getBB().getSharedMap().put(EXTRA_VM_IDS, extraVmIdsSet);
        RebalanceBB.getBB().printSharedMap();
    }

    public static synchronized void HydraTask_createTheNextTurn() {
        Log.getLogWriter().info("HydraTask_createTheNextTurn" + RebalanceBB.getBB().getSharedMap().get(EXTRA_VM_IDS).toString());
        HashSet extraVmIdsSet = (HashSet)RebalanceBB.getBB().getSharedMap().get(EXTRA_VM_IDS);
        Iterator extraVmSetIterator = extraVmIdsSet.iterator();
        Integer nextVmToBeUp = null;
        if (extraVmSetIterator.hasNext()) {
            nextVmToBeUp = (Integer)extraVmSetIterator.next();
        }
        if (nextVmToBeUp != null) {
            RebalanceBB.getBB().getSharedMap().put(NEXT_VM_TO_BE_UP, nextVmToBeUp);
            extraVmIdsSet.remove(nextVmToBeUp);
            RebalanceBB.getBB().getSharedMap().put(EXTRA_VM_IDS, extraVmIdsSet);
            Log.getLogWriter().info("Next Vm id to do rebalance is " + nextVmToBeUp);
        }
    }

    public static void HydraTask_doFEWithExceptionsAndVerify() {
        ((FunctionExecutionWithRebalancingTest)testInstance).doFEWithExceptionsAndVerify();
    }

    public boolean isNextTurn() {
        Log.getLogWriter().info("In isNextTurn() for the vm " + RemoteTestModule.getMyVmid());
        Integer nextTurnVmId = (Integer)RebalanceBB.getBB().getSharedMap().get(NEXT_VM_TO_BE_UP);
        Log.getLogWriter().info("next vm is " + nextTurnVmId);
        Integer myVmId = new Integer(RemoteTestModule.getMyVmid());
        if (nextTurnVmId != null && nextTurnVmId.equals(myVmId)) {
            Log.getLogWriter().info("returning true");
            return true;
        }
        Log.getLogWriter().info("returning false");
        return false;
    }

    public void addCapacityAndReBalance() {
        PRObserver.installObserverHook();
        Integer myVmId = new Integer(RemoteTestModule.getMyVmid());
        super.initInstance("dataStoreRegion");
        Log.getLogWriter().info("After creating region, region size is " + this.aRegion.size());
        long numDataStores = RebalanceBB.getBB().getSharedCounters().incrementAndRead(RebalanceBB.numDataStores);
        long numDataStoresBefore = numDataStores - 1L;
        if (this.aRegion.getAttributes().getPartitionAttributes().getRedundantCopies() > 0) {
            PRObserver.waitForRebalRecov(myVmId, 1, 1, null, null, false);
        }
        Map[] mapArr = BucketState.getBucketMaps(this.aRegion);
        Map primaryMap = mapArr[0];
        Map bucketMap = mapArr[1];
        Log.getLogWriter().info("Before rebalance, buckets: " + bucketMap + ", primaries: " + primaryMap);
        if ((long)bucketMap.size() != numDataStoresBefore) {
            throw new TestException("Expected " + numDataStoresBefore + " vms to contain buckets, but " + bucketMap.size() + " vms contains buckets, buckeMap: " + bucketMap + ", primaryMap: " + primaryMap);
        }
        ParRegUtil.doRebalance();
        mapArr = BucketState.getBucketMaps(this.aRegion);
        primaryMap = mapArr[0];
        bucketMap = mapArr[1];
        Object value = primaryMap.get(myVmId);
        if (value == null) {
            primaryMap.put(myVmId, new Integer(0));
        }
        if ((value = bucketMap.get(myVmId)) == null) {
            bucketMap.put(myVmId, new Integer(0));
        }
        Log.getLogWriter().info("After rebalance, buckets: " + bucketMap + ", primaries: " + primaryMap);
        if (value == null || (Integer)value == 0) {
            throw new TestException("New capacity vm did not gain any buckets, bucketMap " + bucketMap + ", primaryMap " + primaryMap);
        }
        if ((long)bucketMap.size() != numDataStores) {
            throw new TestException("Expected " + numDataStores + " vms to contain buckets, but " + bucketMap.size() + " vms contains buckets, buckeMap: " + bucketMap + ", primaryMap: " + primaryMap);
        }
    }

    public void doFEWithExceptionsAndVerify() {
        this.doFuncExecsAndVerify();
    }

    public void doFuncExecsAndVerify() {
        InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
        if (ds != null) {
            this.doFunctionExecutionAndVerifyResults((InternalExecution)FunctionService.onMembers((DistributedSystem)ds), new MemberResultsCollector());
        }
        if (this.aRegion != null) {
            this.doFunctionExecutionAndVerifyResults((InternalExecution)FunctionService.onRegion((Region)this.aRegion), new MemberResultsCollector());
        }
    }

    public void doFunctionExecutionAndVerifyResults(InternalExecution dataSet, MemberResultsCollector resultCollector) {
        block2: {
            int thisOpId = opId.incrementAndGet();
            ExecuteExceptionBB.getBB().getSharedCounters().zero(ExecuteExceptionBB.exceptionNodes);
            ExecuteExceptionBB.getBB().getSharedCounters().zero(ExecuteExceptionBB.sendResultsNodes);
            InternalExecution finalDataSet = (InternalExecution)dataSet.withCollector((ResultCollector)resultCollector).withArgs((Object)new PartialResultsExecutionFunction.SenderIdWithOpId(RemoteTestModule.getMyVmid(), thisOpId));
            finalDataSet.setWaitOnExceptionFlag(true);
            ResultCollector rc = null;
            try {
                Log.getLogWriter().info("Going to execute for opId=" + thisOpId);
                rc = finalDataSet.execute((Function)new PartialResultsExecutionFunction());
                rc.getResult();
            }
            catch (FunctionException e) {
                Log.getLogWriter().info("Caught Exception " + (Object)((Object)e));
                Log.getLogWriter().info("Result is " + resultCollector.getPartialResults());
                Log.getLogWriter().info("Exceptions are " + e.getExceptions());
                long numOfExceptionNodes = ExecuteExceptionBB.getBB().getSharedCounters().read(ExecuteExceptionBB.exceptionNodes);
                long numOfSendResultNodes = ExecuteExceptionBB.getBB().getSharedCounters().read(ExecuteExceptionBB.sendResultsNodes);
                if ((long)e.getExceptions().size() == numOfExceptionNodes) break block2;
                throw new TestException("Did not receive all the relevant exceptions " + numOfExceptionNodes + " and " + e.getExceptions());
            }
        }
    }
}

