/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamForeach;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPassThrough;
import org.apache.kafka.streams.kstream.internals.KStreamTransform;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KeyValuePrinter;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;

public class KStreamImpl<K, V>
extends AbstractStream<K>
implements KStream<K, V> {
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    public static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    public static final String MERGE_NAME = "KSTREAM-MERGE-";
    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    public static final String SINK_NAME = "KSTREAM-SINK-";
    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private final boolean repartitionRequired;

    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, boolean repartitionRequired) {
        super(topology, name, sourceNodes);
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KStream<K, V> filter(Predicate<K, V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(name, new KStreamFilter<K, V>(predicate, false), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<K, V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(name, new KStreamFilter<K, V>(predicate, true), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new KStreamImpl<K, V>(this.topology, this.internalSelectKey(mapper), this.sourceNodes, true);
    }

    private <K1> String internalSelectKey(final KeyValueMapper<K, V, K1> mapper) {
        String name = this.topology.newName(KEY_SELECT_NAME);
        this.topology.addProcessor(name, new KStreamMap(new KeyValueMapper<K, V, KeyValue<K1, V>>(){

            @Override
            public KeyValue<K1, V> apply(K key, V value) {
                return new KeyValue(mapper.apply(key, value), value);
            }
        }), this.name);
        return name;
    }

    @Override
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.topology.newName(MAP_NAME);
        this.topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.topology.newName(MAPVALUES_NAME);
        this.topology.addProcessor(name, new KStreamMapValues(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public void print() {
        this.print(null, null, null);
    }

    @Override
    public void print(String streamName) {
        this.print(null, null, streamName);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde) {
        this.print(keySerde, valSerde, null);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) {
        String name = this.topology.newName(PRINTING_NAME);
        streamName = streamName == null ? this.name : streamName;
        this.topology.addProcessor(name, new KeyValuePrinter(keySerde, valSerde, streamName), this.name);
    }

    @Override
    public void writeAsText(String filePath) {
        this.writeAsText(filePath, null, null, null);
    }

    @Override
    public void writeAsText(String filePath, String streamName) {
        this.writeAsText(filePath, streamName, null, null);
    }

    @Override
    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
        this.writeAsText(filePath, null, keySerde, valSerde);
    }

    @Override
    public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
        Objects.requireNonNull(filePath, "filePath can't be null");
        String name = this.topology.newName(PRINTING_NAME);
        streamName = streamName == null ? this.name : streamName;
        try {
            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
            this.topology.addProcessor(name, new KeyValuePrinter(printStream, keySerde, valSerde, streamName), this.name);
        }
        catch (FileNotFoundException e) {
            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
            throw new TopologyBuilderException(message);
        }
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.topology.newName(FLATMAP_NAME);
        this.topology.addProcessor(name, new KStreamFlatMap<K, V, K1, V1>(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.topology.newName(FLATMAPVALUES_NAME);
        this.topology.addProcessor(name, new KStreamFlatMapValues(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V>[] branch(Predicate<K, V> ... predicates) {
        if (predicates.length == 0) {
            throw new IllegalArgumentException("you must provide at least one predicate");
        }
        for (Predicate<K, V> predicate : predicates) {
            Objects.requireNonNull(predicate, "predicates can't have null values");
        }
        String branchName = this.topology.newName(BRANCH_NAME);
        this.topology.addProcessor(branchName, new KStreamBranch((Predicate[])predicates.clone()), this.name);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            String childName = this.topology.newName(BRANCHCHILD_NAME);
            this.topology.addProcessor(childName, new KStreamPassThrough(), branchName);
            branchChildren[i] = new KStreamImpl<K, V>(this.topology, childName, this.sourceNodes, this.repartitionRequired);
        }
        return branchChildren;
    }

    public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K, V>[] streams) {
        if (streams == null || streams.length == 0) {
            throw new IllegalArgumentException("Parameter <streams> must not be null or has length zero");
        }
        String name = topology.newName(MERGE_NAME);
        String[] parentNames = new String[streams.length];
        HashSet<String> allSourceNodes = new HashSet<String>();
        boolean requireRepartitioning = false;
        for (int i = 0; i < streams.length; ++i) {
            KStreamImpl stream = (KStreamImpl)streams[i];
            parentNames[i] = stream.name;
            requireRepartitioning |= stream.repartitionRequired;
            allSourceNodes.addAll(stream.sourceNodes);
        }
        topology.addProcessor(name, new KStreamPassThrough(), parentNames);
        return new KStreamImpl<K, V>(topology, name, allSourceNodes, requireRepartitioning);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        this.to(keySerde, valSerde, partitioner, topic);
        return this.topology.stream(keySerde, valSerde, topic);
    }

    @Override
    public void foreach(ForeachAction<K, V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = this.topology.newName(FOREACH_NAME);
        this.topology.addProcessor(name, new KStreamForeach<K, V>(action), this.name);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        return this.through(keySerde, valSerde, null, topic);
    }

    @Override
    public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
        return this.through(null, null, partitioner, topic);
    }

    @Override
    public KStream<K, V> through(String topic) {
        return this.through(null, null, null, topic);
    }

    @Override
    public void to(String topic) {
        this.to(null, null, null, topic);
    }

    @Override
    public void to(StreamPartitioner<K, V> partitioner, String topic) {
        this.to(null, null, partitioner, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        this.to(keySerde, valSerde, null, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        Serializer valSerializer;
        Objects.requireNonNull(topic, "topic can't be null");
        String name = this.topology.newName(SINK_NAME);
        Serializer keySerializer = keySerde == null ? null : keySerde.serializer();
        Serializer serializer = valSerializer = valSerde == null ? null : valSerde.serializer();
        if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
            WindowedSerializer windowedSerializer = (WindowedSerializer)keySerializer;
            partitioner = new WindowedStreamPartitioner(windowedSerializer);
        }
        this.topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
    }

    @Override
    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.topology.newName(TRANSFORM_NAME);
        this.topology.addProcessor(name, new KStreamTransform<K, V, K1, V1>(transformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        String name = this.topology.newName(TRANSFORMVALUES_NAME);
        this.topology.addProcessor(name, new KStreamTransformValues(valueTransformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public void process(ProcessorSupplier<K, V> processorSupplier, String ... stateStoreNames) {
        String name = this.topology.newName(PROCESSOR_NAME);
        this.topology.addProcessor(name, processorSupplier, this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, null, null, null, false);
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, null, null, null, true);
    }

    private <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde, boolean outer) {
        return this.doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new DefaultJoin(outer));
    }

    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde, KStreamImplJoin join) {
        Objects.requireNonNull(other, "other KStream can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>)other;
        if (joinThis.repartitionRequired) {
            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
        }
        if (joinOther.repartitionRequired) {
            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde, null);
        }
        joinThis.ensureJoinableWith(joinOther);
        return join.join(joinThis, joinOther, joiner, windows, keySerde, thisValueSerde, otherValueSerde);
    }

    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde, Serde<V> valSerde, String topicNamePrefix) {
        String repartitionedSourceName = KStreamImpl.createReparitionedSource(this, keySerde, valSerde, topicNamePrefix);
        return new KStreamImpl<K, V>(this.topology, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false);
    }

    static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream, Serde<K1> keySerde, Serde<V1> valSerde, String topicNamePrefix) {
        Serializer keySerializer = keySerde != null ? keySerde.serializer() : null;
        Serializer valSerializer = valSerde != null ? valSerde.serializer() : null;
        Deserializer keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
        Deserializer valDeserializer = valSerde != null ? valSerde.deserializer() : null;
        String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
        String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
        String sinkName = stream.topology.newName(SINK_NAME);
        String filterName = stream.topology.newName(FILTER_NAME);
        String sourceName = stream.topology.newName(SOURCE_NAME);
        stream.topology.addInternalTopic(repartitionTopic);
        stream.topology.addProcessor(filterName, new KStreamFilter(new Predicate<K1, V1>(){

            @Override
            public boolean test(K1 key, V1 value) {
                return key != null;
            }
        }, false), stream.name);
        stream.topology.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, filterName);
        stream.topology.addSource(sourceName, keyDeserializer, valDeserializer, repartitionTopic);
        return sourceName;
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValSerde, Serde<V1> otherValueSerde) {
        return this.doJoin(other, joiner, windows, keySerde, thisValSerde, otherValueSerde, new LeftJoin());
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.leftJoin(other, joiner, windows, null, null, null);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        return this.leftJoin(other, joiner, (Serde<K>)null, (Serde<V>)null);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, Serde<K> keySerde, Serde<V> valueSerde) {
        Objects.requireNonNull(other, "other KTable can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde, valueSerde, null);
            return super.doStreamTableLeftJoin(other, joiner);
        }
        return this.doStreamTableLeftJoin(other, joiner);
    }

    private <V1, R> KStream<K, R> doStreamTableLeftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String name = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(name, new KStreamKTableLeftJoin((KTableImpl)other, joiner), this.name);
        this.topology.connectProcessorAndStateStores(name, ((KTableImpl)other).valueGetterSupplier().storeNames());
        this.topology.connectProcessors(this.name, ((KTableImpl)other).name);
        return new KStreamImpl<K, V>(this.topology, name, allSourceNodes, false);
    }

    @Override
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) {
        return this.groupBy(selector, null, null);
    }

    @Override
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector, Serde<K1> keySerde, Serde<V> valSerde) {
        Objects.requireNonNull(selector, "selector can't be null");
        String selectName = this.internalSelectKey(selector);
        return new KGroupedStreamImpl<K1, V>(this.topology, selectName, this.sourceNodes, keySerde, valSerde, true);
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(null, null);
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde, Serde<V> valSerde) {
        return new KGroupedStreamImpl<K, V>(this.topology, this.name, this.sourceNodes, keySerde, valSerde, this.repartitionRequired);
    }

    private static <K, V> StateStoreSupplier createWindowedStateStore(JoinWindows windows, Serde<K> keySerde, Serde<V> valueSerde, String storeName) {
        return Stores.create(storeName).withKeys(keySerde).withValues(valueSerde).persistent().windowed(windows.size(), windows.maintainMs(), windows.segments, true).build();
    }

    private class LeftJoin
    implements KStreamImplJoin {
        private LeftJoin() {
        }

        @Override
        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<V1, V2, R> joiner, JoinWindows windows, Serde<K1> keySerde, Serde<V1> lhsValueSerde, Serde<V2> otherValueSerde) {
            String otherWindowStreamName = KStreamImpl.this.topology.newName(KStreamImpl.WINDOWED_NAME);
            String joinThisName = KStreamImpl.this.topology.newName(KStreamImpl.LEFTJOIN_NAME);
            StateStoreSupplier otherWindow = KStreamImpl.createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
            KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
            KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindow.name(), windows.before, windows.after, joiner, true);
            KStreamImpl.this.topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream)((Object)other)).name);
            KStreamImpl.this.topology.addProcessor(joinThisName, joinThis, ((AbstractStream)((Object)lhs)).name);
            KStreamImpl.this.topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
            HashSet<String> allSourceNodes = new HashSet<String>(((AbstractStream)((Object)lhs)).sourceNodes);
            allSourceNodes.addAll(((KStreamImpl)other).sourceNodes);
            return new KStreamImpl(KStreamImpl.this.topology, joinThisName, allSourceNodes, false);
        }
    }

    private class DefaultJoin
    implements KStreamImplJoin {
        private final boolean outer;

        DefaultJoin(boolean outer) {
            this.outer = outer;
        }

        @Override
        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<V1, V2, R> joiner, JoinWindows windows, Serde<K1> keySerde, Serde<V1> lhsValueSerde, Serde<V2> otherValueSerde) {
            String thisWindowStreamName = KStreamImpl.this.topology.newName(KStreamImpl.WINDOWED_NAME);
            String otherWindowStreamName = KStreamImpl.this.topology.newName(KStreamImpl.WINDOWED_NAME);
            String joinThisName = this.outer ? KStreamImpl.this.topology.newName(KStreamImpl.OUTERTHIS_NAME) : KStreamImpl.this.topology.newName(KStreamImpl.JOINTHIS_NAME);
            String joinOtherName = this.outer ? KStreamImpl.this.topology.newName(KStreamImpl.OUTEROTHER_NAME) : KStreamImpl.this.topology.newName(KStreamImpl.JOINOTHER_NAME);
            String joinMergeName = KStreamImpl.this.topology.newName(KStreamImpl.MERGE_NAME);
            StateStoreSupplier thisWindow = KStreamImpl.createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
            StateStoreSupplier otherWindow = KStreamImpl.createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
            KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
            KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
            KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindow.name(), windows.before, windows.after, joiner, this.outer);
            KStreamKStreamJoin joinOther = new KStreamKStreamJoin(thisWindow.name(), windows.after, windows.before, AbstractStream.reverseJoiner(joiner), this.outer);
            KStreamPassThrough joinMerge = new KStreamPassThrough();
            KStreamImpl.this.topology.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream)((Object)lhs)).name);
            KStreamImpl.this.topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream)((Object)other)).name);
            KStreamImpl.this.topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
            KStreamImpl.this.topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
            KStreamImpl.this.topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
            KStreamImpl.this.topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
            KStreamImpl.this.topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
            HashSet<String> allSourceNodes = new HashSet<String>(((AbstractStream)((Object)lhs)).sourceNodes);
            allSourceNodes.addAll(((KStreamImpl)other).sourceNodes);
            return new KStreamImpl(KStreamImpl.this.topology, joinMergeName, allSourceNodes, false);
        }
    }

    private static interface KStreamImplJoin {
        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> var1, KStream<K1, V2> var2, ValueJoiner<V1, V2, R> var3, JoinWindows var4, Serde<K1> var5, Serde<V1> var6, Serde<V2> var7);
    }
}

