/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.partitioned;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation;
import dunit.Host;
import dunit.VM;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class StreamingPartitionOperationOneDUnitTest
extends CacheTestCase {
    CacheSerializableRunnable createPrRegionWithDS_DACK = new CacheSerializableRunnable("createPrRegionWithDS"){

        @Override
        public void run2() throws CacheException {
            Cache cache = StreamingPartitionOperationOneDUnitTest.this.getCache();
            AttributesFactory attr = new AttributesFactory();
            PartitionAttributesFactory paf = new PartitionAttributesFactory();
            paf.setTotalNumBuckets(5);
            PartitionAttributes prAttr = paf.create();
            attr.setPartitionAttributes(prAttr);
            RegionAttributes regionAttribs = attr.create();
            cache.createRegion("PR1", regionAttribs);
        }
    };
    protected static final int NUM_INTEGERS = 819200;

    public StreamingPartitionOperationOneDUnitTest(String name) {
        super(name);
    }

    public void testStreamingPartitionOneProviderNoExceptions() throws Exception {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        InternalDistributedMember otherId = (InternalDistributedMember)vm0.invoke(new IDGetter(), "getMemberId");
        vm0.invoke(this.createPrRegionWithDS_DACK);
        this.createPrRegionWithDS_DACK.run2();
        int regionId = ((PartitionedRegion)this.getCache().getRegion("PR1")).getPRId();
        Set<InternalDistributedMember> setOfIds = Collections.singleton(otherId);
        TestStreamingPartitionOperationOneProviderNoExceptions streamOp = new TestStreamingPartitionOperationOneProviderNoExceptions(this.getSystem(), regionId);
        try {
            streamOp.getPartitionedDataFrom(setOfIds);
        }
        catch (VirtualMachineError e) {
            SystemFailure.initiateFailure((Error)e);
            throw e;
        }
        catch (Throwable t) {
            StreamingPartitionOperationOneDUnitTest.fail("getPartitionedDataFrom failed", t);
        }
        StreamingPartitionOperationOneDUnitTest.assertTrue((boolean)streamOp.dataValidated);
    }

    public static final class TestStreamingPartitionMessageOneProviderNoExceptions
    extends StreamingPartitionOperation.StreamingPartitionMessage {
        private int nextInt = -10;
        private int count = 0;

        public TestStreamingPartitionMessageOneProviderNoExceptions() {
        }

        public TestStreamingPartitionMessageOneProviderNoExceptions(Set recipients, int regionId, ReplyProcessor21 processor) {
            super(recipients, regionId, processor);
        }

        protected Object getNextReplyObject(PartitionedRegion pr) throws ReplyException {
            if (++this.count > 819200) {
                return Token.END_OF_STREAM;
            }
            this.nextInt += 10;
            return new Integer(this.nextInt);
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }
    }

    public static class TestStreamingPartitionOperationOneProviderNoExceptions
    extends StreamingPartitionOperation {
        ConcurrentMap chunkMap = new ConcurrentHashMap();
        int numChunks = -1;
        volatile boolean dataValidated = false;

        public TestStreamingPartitionOperationOneProviderNoExceptions(InternalDistributedSystem sys, int regionId) {
            super(sys, regionId);
        }

        protected DistributionMessage createRequestMessage(Set recipients, ReplyProcessor21 processor) {
            TestStreamingPartitionMessageOneProviderNoExceptions msg = new TestStreamingPartitionMessageOneProviderNoExceptions(recipients, this.regionId, processor);
            return msg;
        }

        protected synchronized boolean processData(List objects, InternalDistributedMember sender, int sequenceNum, boolean lastInSequence) {
            LogWriter logger = this.sys.getLogWriter();
            List prevValue = this.chunkMap.putIfAbsent(new Integer(sequenceNum), objects);
            if (prevValue != null) {
                logger.severe("prevValue != null");
            }
            if (lastInSequence) {
                if (this.numChunks != -1) {
                    logger.severe("this.numChunks != -1");
                }
                this.numChunks = sequenceNum + 1;
            }
            if (this.chunkMap.size() == this.numChunks) {
                this.validateData();
            } else {
                if (this.numChunks != -1 && this.chunkMap.size() >= this.numChunks) {
                    logger.severe("this.numChunks != -1 && this.chunkMap.size() >= this.numChunks");
                }
                if (this.chunkMap.size() >= 200) {
                    logger.warning("this.chunkMap.size() >= 200");
                }
            }
            return true;
        }

        private void validateData() {
            ArrayList[] arrayOfLists = new ArrayList[this.numChunks];
            int expectedInt = 0;
            LogWriter logger = this.sys.getLogWriter();
            for (Map.Entry entry : this.chunkMap.entrySet()) {
                int seqNum = (Integer)entry.getKey();
                List objList = (List)entry.getValue();
                arrayOfLists[seqNum] = objList;
            }
            int count = 0;
            for (int i = 0; i < this.numChunks; ++i) {
                for (Integer nextInteger : arrayOfLists[i]) {
                    if (nextInteger != expectedInt) {
                        logger.severe("nextInteger.intValue() != expectedInt");
                        return;
                    }
                    expectedInt += 10;
                    ++count;
                }
            }
            if (count != 819200) {
                logger.severe("found " + count + " integers, expected " + 819200);
            } else {
                this.dataValidated = true;
                logger.info("Received " + count + " integers in " + this.numChunks + " chunks");
            }
        }
    }

    class IDGetter
    implements Serializable {
        IDGetter() {
        }

        InternalDistributedMember getMemberId() {
            return StreamingPartitionOperationOneDUnitTest.this.getSystem().getDistributedMember();
        }
    }
}

