/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.distributed.internal.streaming;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.streaming.StreamingOperation;
import com.gemstone.gemfire.internal.cache.Token;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class StreamingOperationOneDUnitTest
extends DistributedTestCase {
    protected static final int NUM_INTEGERS = 819200;

    public StreamingOperationOneDUnitTest(String name) {
        super(name);
    }

    public void testStreamingOneProviderNoExceptions() throws Exception {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        class IDGetter
        implements Serializable {
            IDGetter() {
            }

            InternalDistributedMember getMemberId() {
                return StreamingOperationOneDUnitTest.this.getSystem().getDistributedMember();
            }
        }
        InternalDistributedMember otherId = (InternalDistributedMember)vm0.invoke(new IDGetter(), "getMemberId");
        Set<InternalDistributedMember> setOfIds = Collections.singleton(otherId);
        TestStreamingOperationOneProviderNoExceptions streamOp = new TestStreamingOperationOneProviderNoExceptions(this.getSystem());
        streamOp.getDataFromAll(setOfIds);
        StreamingOperationOneDUnitTest.assertTrue((boolean)streamOp.dataValidated);
    }

    public static final class TestRequestStreamingMessageOneProviderNoExceptions
    extends StreamingOperation.RequestStreamingMessage {
        private int nextInt = -10;
        private int count = 0;

        protected Object getNextReplyObject() throws ReplyException {
            if (++this.count > 819200) {
                return Token.END_OF_STREAM;
            }
            this.nextInt += 10;
            return new Integer(this.nextInt);
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }
    }

    public static class TestStreamingOperationOneProviderNoExceptions
    extends StreamingOperation {
        ConcurrentMap chunkMap = new ConcurrentHashMap();
        int numChunks = -1;
        volatile boolean dataValidated = false;

        public TestStreamingOperationOneProviderNoExceptions(InternalDistributedSystem sys) {
            super(sys);
        }

        protected DistributionMessage createRequestMessage(Set recipients, ReplyProcessor21 processor) {
            TestRequestStreamingMessageOneProviderNoExceptions msg = new TestRequestStreamingMessageOneProviderNoExceptions();
            msg.processorId = processor == null ? 0 : processor.getProcessorId();
            msg.setRecipients(recipients);
            return msg;
        }

        protected synchronized boolean processData(List objects, InternalDistributedMember sender, int sequenceNum, boolean lastInSequence) {
            LogWriter logger = this.sys.getLogWriter();
            List prevValue = this.chunkMap.putIfAbsent(new Integer(sequenceNum), objects);
            if (prevValue != null) {
                logger.severe("prevValue != null");
            }
            if (lastInSequence) {
                if (this.numChunks != -1) {
                    logger.severe("this.numChunks != -1");
                }
                this.numChunks = sequenceNum + 1;
            }
            if (this.chunkMap.size() == this.numChunks) {
                this.validateData();
            } else {
                if (this.numChunks != -1 && this.chunkMap.size() >= this.numChunks) {
                    logger.severe("this.numChunks != -1 && this.chunkMap.size() >= this.numChunks");
                }
                if (this.chunkMap.size() >= 200) {
                    logger.warning("this.chunkMap.size() >= 200");
                }
            }
            return true;
        }

        private void validateData() {
            ArrayList[] arrayOfLists = new ArrayList[this.numChunks];
            int expectedInt = 0;
            LogWriter logger = this.sys.getLogWriter();
            for (Map.Entry entry : this.chunkMap.entrySet()) {
                int seqNum = (Integer)entry.getKey();
                List objList = (List)entry.getValue();
                arrayOfLists[seqNum] = objList;
            }
            int count = 0;
            for (int i = 0; i < this.numChunks; ++i) {
                for (Integer nextInteger : arrayOfLists[i]) {
                    if (nextInteger != expectedInt) {
                        logger.severe("nextInteger.intValue() != expectedInt");
                        return;
                    }
                    expectedInt += 10;
                    ++count;
                }
            }
            if (count != 819200) {
                logger.severe("found " + count + " integers, expected " + 819200);
            } else {
                this.dataValidated = true;
                logger.info("Received " + count + " integers in " + this.numChunks + " chunks");
            }
        }
    }
}

