/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.rules.ExternalResource;

public class KStreamTestDriver
extends ExternalResource {
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    private ProcessorTopology topology;
    private MockProcessorContext context;
    private ProcessorTopology globalTopology;
    private final LogContext logContext = new LogContext("testCache ");

    @Deprecated
    public void setUp(KStreamBuilder builder) {
        this.setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    @Deprecated
    public void setUp(KStreamBuilder builder, File stateDir) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
    }

    @Deprecated
    public void setUp(KStreamBuilder builder, File stateDir, long cacheSize) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
    }

    @Deprecated
    public void setUp(KStreamBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde) {
        this.setUp(builder, stateDir, keySerde, valSerde, 0x100000L);
    }

    @Deprecated
    public void setUp(KStreamBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde, long cacheSize) {
        builder.setApplicationId("TestDriver");
        this.topology = builder.build(null);
        this.globalTopology = builder.buildGlobalStateTopology();
        ThreadCache cache = new ThreadCache(this.logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic"));
        if (this.globalTopology != null) {
            this.initTopology(this.globalTopology, this.globalTopology.globalStateStores());
        }
        this.initTopology(this.topology, this.topology.stateStores());
    }

    public void setUp(StreamsBuilder builder) {
        this.setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder builder, File stateDir) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder builder, File stateDir, long cacheSize) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
    }

    public void setUp(StreamsBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde) {
        this.setUp(builder, stateDir, keySerde, valSerde, 0x100000L);
    }

    public void setUp(StreamsBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde, long cacheSize) {
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("TestDriver");
        this.topology = internalTopologyBuilder.build((Integer)null);
        this.globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
        ThreadCache cache = new ThreadCache(this.logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic"));
        if (this.globalTopology != null) {
            this.initTopology(this.globalTopology, this.globalTopology.globalStateStores());
        }
        this.initTopology(this.topology, this.topology.stateStores());
    }

    protected void after() {
        if (this.topology != null) {
            this.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTopology(ProcessorTopology topology, List<StateStore> stores) {
        for (StateStore store : stores) {
            store.init(this.context, store);
        }
        for (ProcessorNode node : topology.processors()) {
            this.context.setCurrentNode(node);
            try {
                node.init(this.context);
            }
            finally {
                this.context.setCurrentNode(null);
            }
        }
    }

    public ProcessorTopology topology() {
        return this.topology;
    }

    public ProcessorContext context() {
        return this.context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(String topicName, Object key, Object value) {
        ProcessorNode prevNode = this.context.currentNode();
        ProcessorNode currNode = this.sourceNodeByTopicName(topicName);
        if (currNode != null) {
            this.context.setRecordContext(this.createRecordContext(this.context.timestamp()));
            this.context.setCurrentNode(currNode);
            try {
                this.context.forward(key, value);
            }
            finally {
                this.context.setCurrentNode(prevNode);
            }
        }
    }

    private ProcessorNode sourceNodeByTopicName(String topicName) {
        SourceNode topicNode = this.topology.source(topicName);
        if (topicNode == null && this.globalTopology != null) {
            topicNode = this.globalTopology.source(topicName);
        }
        return topicNode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void punctuate(long timestamp) {
        ProcessorNode prevNode = this.context.currentNode();
        for (ProcessorNode processor : this.topology.processors()) {
            if (processor.processor() == null) continue;
            this.context.setRecordContext(this.createRecordContext(timestamp));
            this.context.setCurrentNode(processor);
            try {
                processor.processor().punctuate(timestamp);
            }
            finally {
                this.context.setCurrentNode(prevNode);
            }
        }
    }

    public void setTime(long timestamp) {
        this.context.setTime(timestamp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        for (ProcessorNode node : this.topology.processors()) {
            this.context.setCurrentNode(node);
            try {
                node.close();
            }
            finally {
                this.context.setCurrentNode(null);
            }
        }
        this.closeState();
        this.context.close();
    }

    public Set<String> allProcessorNames() {
        HashSet<String> names = new HashSet<String>();
        List<ProcessorNode> nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            names.add(node.name());
        }
        return names;
    }

    public ProcessorNode processor(String name) {
        List<ProcessorNode> nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            if (!node.name().equals(name)) continue;
            return node;
        }
        return null;
    }

    public Map<String, StateStore> allStateStores() {
        return this.context.allStateStores();
    }

    public void flushState() {
        for (StateStore stateStore : this.context.allStateStores().values()) {
            stateStore.flush();
        }
    }

    private void closeState() {
        this.flushState();
        for (StateStore stateStore : this.context.allStateStores().values()) {
            stateStore.close();
        }
    }

    private ProcessorRecordContext createRecordContext(long timestamp) {
        return new ProcessorRecordContext(timestamp, -1L, -1, "topic");
    }

    private class MockRecordCollector
    extends RecordCollectorImpl {
        MockRecordCollector() {
            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler());
        }

        @Override
        public <K, V> void send(String topic, K key, V value, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(topic) != null) {
                KStreamTestDriver.this.process(topic, key, value);
            }
        }

        @Override
        public <K, V> void send(String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(topic) != null) {
                KStreamTestDriver.this.process(topic, key, value);
            }
        }

        @Override
        public void flush() {
        }

        @Override
        public void close() {
        }
    }
}

