/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamGlobalKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPassThrough;
import org.apache.kafka.streams.kstream.internals.KStreamPeek;
import org.apache.kafka.streams.kstream.internals.KStreamTransform;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.SerializedInternal;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;

public class KStreamImpl<K, V>
extends AbstractStream<K>
implements KStream<K, V> {
    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    private static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    private static final String JOIN_NAME = "KSTREAM-JOIN-";
    private static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String MERGE_NAME = "KSTREAM-MERGE-";
    private static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    private static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
    private final boolean repartitionRequired;

    public KStreamImpl(InternalStreamsBuilder builder, String name, Set<String> sourceNodes, boolean repartitionRequired) {
        super(builder, name, sourceNodes);
        this.repartitionRequired = repartitionRequired;
        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>(){

            @Override
            public String apply(K key, V value) {
                return String.format("%s, %s", key, value);
            }
        };
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.builder.newProcessorName(FILTER_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<K, V>(predicate, false), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.builder.newProcessorName(FILTER_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<K, V>(predicate, true), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public <K1> KStream<K1, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new KStreamImpl<K, V>(this.builder, this.internalSelectKey(mapper), this.sourceNodes, true);
    }

    private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
        String name = this.builder.newProcessorName(KEY_SELECT_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamMap(new KeyValueMapper<K, V, KeyValue<K1, V>>(){

            @Override
            public KeyValue<K1, V> apply(K key, V value) {
                return new KeyValue(mapper.apply(key, value), value);
            }
        }), this.name);
        return name;
    }

    @Override
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(MAP_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamMap(mapper), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
        return this.mapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(MAPVALUES_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, V, VR>(mapper), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public void print() {
        this.print(this.defaultKeyValueMapper, null, null, this.name);
    }

    @Override
    public void print(String label) {
        this.print(this.defaultKeyValueMapper, null, null, label);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde) {
        this.print(this.defaultKeyValueMapper, keySerde, valSerde, this.name);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde, String label) {
        this.print(this.defaultKeyValueMapper, keySerde, valSerde, label);
    }

    @Override
    public void print(KeyValueMapper<? super K, ? super V, String> mapper) {
        this.print(mapper, null, null, this.name);
    }

    @Override
    public void print(KeyValueMapper<? super K, ? super V, String> mapper, String label) {
        this.print(mapper, null, null, label);
    }

    @Override
    public void print(KeyValueMapper<? super K, ? super V, String> mapper, Serde<K> keySerde, Serde<V> valSerde) {
        this.print(mapper, keySerde, valSerde, this.name);
    }

    @Override
    public void print(KeyValueMapper<? super K, ? super V, String> mapper, Serde<K> keySerde, Serde<V> valSerde, String label) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(label, "label can't be null");
        this.print(Printed.toSysOut().withLabel(label).withKeyValueMapper(mapper));
    }

    @Override
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed can't be null");
        PrintedInternal<K, V> printedInternal = new PrintedInternal<K, V>(printed);
        String name = this.builder.newProcessorName(PRINTING_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
    }

    @Override
    public void writeAsText(String filePath) {
        this.writeAsText(filePath, this.name, null, null, this.defaultKeyValueMapper);
    }

    @Override
    public void writeAsText(String filePath, String label) {
        this.writeAsText(filePath, label, null, null, this.defaultKeyValueMapper);
    }

    @Override
    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
        this.writeAsText(filePath, this.name, keySerde, valSerde, this.defaultKeyValueMapper);
    }

    @Override
    public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) {
        this.writeAsText(filePath, label, keySerde, valSerde, this.defaultKeyValueMapper);
    }

    @Override
    public void writeAsText(String filePath, KeyValueMapper<? super K, ? super V, String> mapper) {
        this.writeAsText(filePath, this.name, null, null, mapper);
    }

    @Override
    public void writeAsText(String filePath, String label, KeyValueMapper<? super K, ? super V, String> mapper) {
        this.writeAsText(filePath, label, null, null, mapper);
    }

    @Override
    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String> mapper) {
        this.writeAsText(filePath, this.name, keySerde, valSerde, mapper);
    }

    @Override
    public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String> mapper) {
        Objects.requireNonNull(filePath, "filePath can't be null");
        Objects.requireNonNull(label, "label can't be null");
        Objects.requireNonNull(mapper, "mapper can't be null");
        this.print(Printed.toFile(filePath).withKeyValueMapper(mapper).withLabel(label));
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(FLATMAP_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap(mapper), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
        return this.flatMapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(FLATMAPVALUES_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues(mapper), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V>[] branch(Predicate<? super K, ? super V> ... predicates) {
        if (predicates.length == 0) {
            throw new IllegalArgumentException("you must provide at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicates) {
            Objects.requireNonNull(predicate, "predicates can't have null values");
        }
        String branchName = this.builder.newProcessorName(BRANCH_NAME);
        this.builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch((Predicate[])predicates.clone()), this.name);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            String childName = this.builder.newProcessorName(BRANCHCHILD_NAME);
            this.builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough(), branchName);
            branchChildren[i] = new KStreamImpl<K, V>(this.builder, childName, this.sourceNodes, this.repartitionRequired);
        }
        return branchChildren;
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream) {
        Objects.requireNonNull(stream);
        return this.merge(this.builder, stream);
    }

    private KStream<K, V> merge(InternalStreamsBuilder builder, KStream<K, V> stream) {
        KStreamImpl streamImpl = (KStreamImpl)stream;
        String name = builder.newProcessorName(MERGE_NAME);
        String[] parentNames = new String[]{this.name, streamImpl.name};
        HashSet<String> allSourceNodes = new HashSet<String>();
        boolean requireRepartitioning = streamImpl.repartitionRequired || this.repartitionRequired;
        allSourceNodes.addAll(this.sourceNodes);
        allSourceNodes.addAll(streamImpl.sourceNodes);
        builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough(), parentNames);
        return new KStreamImpl<K, V>(builder, name, allSourceNodes, requireRepartitioning);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
        return this.through(topic, Produced.with(keySerde, valSerde, partitioner));
    }

    @Override
    public KStream<K, V> through(String topic, Produced<K, V> produced) {
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        this.to(topic, producedInternal);
        return this.builder.stream(Collections.singleton(topic), new ConsumedInternal<K, V>(producedInternal.keySerde(), producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null));
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = this.builder.newProcessorName(FOREACH_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<K, V>(action, false), this.name);
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = this.builder.newProcessorName(PEEK_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<K, V>(action, true), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        return this.through(topic, Produced.with(keySerde, valSerde));
    }

    @Override
    public KStream<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
        return this.through(topic, Produced.streamPartitioner(partitioner));
    }

    @Override
    public KStream<K, V> through(String topic) {
        return this.through(null, null, null, topic);
    }

    @Override
    public void to(String topic) {
        this.to(topic, Produced.with(null, null, null));
    }

    @Override
    public void to(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
        this.to(topic, Produced.streamPartitioner(partitioner));
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        this.to(topic, Produced.with(keySerde, valSerde));
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
        Objects.requireNonNull(topic, "topic can't be null");
        this.to(topic, Produced.with(keySerde, valSerde, partitioner));
    }

    @Override
    public void to(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        this.to(topic, new ProducedInternal<K, V>(produced));
    }

    @Override
    private void to(String topic, ProducedInternal<K, V> produced) {
        String name = this.builder.newProcessorName(SINK_NAME);
        Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
        Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
        StreamPartitioner<K, V> partitioner = produced.streamPartitioner();
        if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
            WindowedSerializer windowedSerializer = (WindowedSerializer)keySerializer;
            WindowedStreamPartitioner windowedPartitioner = new WindowedStreamPartitioner(topic, windowedSerializer);
            this.builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
        } else {
            this.builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
        }
    }

    @Override
    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<K, V, K1, V1>(transformerSupplier), this.name);
        if (stateStoreNames != null && stateStoreNames.length > 0) {
            this.builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        return this.transformValues(KStreamImpl.toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        return this.transformValues(KStreamImpl.toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    private <VR> KStream<K, VR> transformValues(InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier, String ... stateStoreNames) {
        String name = this.builder.newProcessorName(TRANSFORMVALUES_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<K, V, VR>(internalValueTransformerWithKeySupplier), this.name);
        if (stateStoreNames != null && stateStoreNames.length > 0) {
            this.builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, this.repartitionRequired);
    }

    @Override
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String ... stateStoreNames) {
        String name = this.builder.newProcessorName(PROCESSOR_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
        if (stateStoreNames != null && stateStoreNames.length > 0) {
            this.builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.doJoin(other, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde), new KStreamImplJoin(false, false));
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        return this.doJoin(otherStream, joiner, windows, joined, new KStreamImplJoin(false, false));
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.outerJoin(other, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde));
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows) {
        return this.outerJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        return this.doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
    }

    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows, Joined<K, V, V1> joined, KStreamImplJoin join) {
        Objects.requireNonNull(other, "other KStream can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>)other;
        if (joinThis.repartitionRequired) {
            joinThis = joinThis.repartitionForJoin(joined.keySerde(), joined.valueSerde());
        }
        if (joinOther.repartitionRequired) {
            joinOther = joinOther.repartitionForJoin(joined.keySerde(), joined.otherValueSerde());
        }
        joinThis.ensureJoinableWith(joinOther);
        return join.join(joinThis, joinOther, joiner, windows, joined);
    }

    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde, Serde<V> valSerde) {
        String repartitionedSourceName = KStreamImpl.createReparitionedSource(this.builder, keySerde, valSerde, null, this.name);
        return new KStreamImpl<K, V>(this.builder, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false);
    }

    static <K1, V1> String createReparitionedSource(InternalStreamsBuilder builder, Serde<K1> keySerde, Serde<V1> valSerde, String topicNamePrefix, String name) {
        Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
        Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
        Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
        Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
        String baseName = topicNamePrefix != null ? topicNamePrefix : name;
        String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
        String sinkName = builder.newProcessorName(SINK_NAME);
        String filterName = builder.newProcessorName(FILTER_NAME);
        String sourceName = builder.newProcessorName(SOURCE_NAME);
        builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
        builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter(new Predicate<K1, V1>(){

            @Override
            public boolean test(K1 key, V1 value) {
                return key != null;
            }
        }, false), name);
        builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, null, filterName);
        builder.internalTopologyBuilder.addSource(null, sourceName, (TimestampExtractor)new FailOnInvalidTimestamp(), keyDeserializer, valDeserializer, repartitionTopic);
        return sourceName;
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValSerde, Serde<V1> otherValueSerde) {
        return this.doJoin(other, joiner, windows, Joined.with(keySerde, thisValSerde, otherValueSerde), new KStreamImplJoin(true, false));
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, JoinWindows windows) {
        return this.leftJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "joined can't be null");
        return this.doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, false));
    }

    @Override
    public <V1, R> KStream<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.join(other, joiner, Joined.with(null, null, null));
    }

    @Override
    public <VT, VR> KStream<K, VR> join(KTable<K, VT> other, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, false);
        }
        return this.doStreamTableJoin(other, joiner, false);
    }

    @Override
    public <V1, R> KStream<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, Serde<K> keySerde, Serde<V> valueSerde) {
        return this.join(other, joiner, Joined.with(keySerde, valueSerde, null));
    }

    @Override
    public <K1, V1, R> KStream<K, R> leftJoin(GlobalKTable<K1, V1> globalTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, true);
    }

    @Override
    public <K1, V1, V2> KStream<K, V2> join(GlobalKTable<K1, V1> globalTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, ValueJoiner<? super V, ? super V1, ? extends V2> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, false);
    }

    private <K1, V1, V2> KStream<K, V2> globalTableJoin(GlobalKTable<K1, V1> globalTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, ValueJoiner<? super V, ? super V1, ? extends V2> joiner, boolean leftJoin) {
        Objects.requireNonNull(globalTable, "globalTable can't be null");
        Objects.requireNonNull(keyMapper, "keyMapper can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl)globalTable).valueGetterSupplier();
        String name = this.builder.newProcessorName(LEFTJOIN_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<K, K1, V2, V, V1>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
        return new KStreamImpl<K, V>(this.builder, name, this.sourceNodes, false);
    }

    private <V1, R> KStream<K, R> doStreamTableJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, boolean leftJoin) {
        Objects.requireNonNull(other, "other KTable can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String name = this.builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
        this.builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin(((KTableImpl)other).valueGetterSupplier(), joiner, leftJoin), this.name);
        this.builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl)other).valueGetterSupplier().storeNames());
        this.builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl)other).name);
        return new KStreamImpl<K, V>(this.builder, name, allSourceNodes, false);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.leftJoin(other, joiner, Joined.with(null, null, null));
    }

    @Override
    public <VT, VR> KStream<K, VR> leftJoin(KTable<K, VT> other, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, true);
        }
        return this.doStreamTableJoin(other, joiner, true);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, Serde<K> keySerde, Serde<V> valueSerde) {
        return this.leftJoin(other, joiner, Joined.with(keySerde, valueSerde, null));
    }

    @Override
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector) {
        return this.groupBy(selector, Serialized.with(null, null));
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector, Serialized<KR, V> serialized) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal<KR, V> serializedInternal = new SerializedInternal<KR, V>(serialized);
        String selectName = this.internalSelectKey(selector);
        return new KGroupedStreamImpl<KR, V>(this.builder, selectName, this.sourceNodes, serializedInternal.keySerde(), serializedInternal.valueSerde(), true);
    }

    @Override
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector, Serde<K1> keySerde, Serde<V> valSerde) {
        Objects.requireNonNull(selector, "selector can't be null");
        return this.groupBy(selector, Serialized.with(keySerde, valSerde));
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(Serialized.with(null, null));
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Serialized<K, V> serialized) {
        SerializedInternal<K, V> serializedInternal = new SerializedInternal<K, V>(serialized);
        return new KGroupedStreamImpl<K, V>(this.builder, this.name, this.sourceNodes, serializedInternal.keySerde(), serializedInternal.valueSerde(), this.repartitionRequired);
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde, Serde<V> valSerde) {
        return this.groupByKey(Serialized.with(keySerde, valSerde));
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> createWindowedStateStore(JoinWindows windows, Serde<K> keySerde, Serde<V> valueSerde, String storeName) {
        return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName, windows.maintainMs(), windows.segments, windows.size(), true), keySerde, valueSerde);
    }

    private class KStreamImplJoin {
        private final boolean leftOuter;
        private final boolean rightOuter;

        KStreamImplJoin(boolean leftOuter, boolean rightOuter) {
            this.leftOuter = leftOuter;
            this.rightOuter = rightOuter;
        }

        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, JoinWindows windows, Joined<K1, V1, V2> joined) {
            String thisWindowStreamName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.WINDOWED_NAME);
            String otherWindowStreamName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.WINDOWED_NAME);
            String joinThisName = this.rightOuter ? KStreamImpl.this.builder.newProcessorName(KStreamImpl.OUTERTHIS_NAME) : KStreamImpl.this.builder.newProcessorName(KStreamImpl.JOINTHIS_NAME);
            String joinOtherName = this.leftOuter ? KStreamImpl.this.builder.newProcessorName(KStreamImpl.OUTEROTHER_NAME) : KStreamImpl.this.builder.newProcessorName(KStreamImpl.JOINOTHER_NAME);
            String joinMergeName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.MERGE_NAME);
            StoreBuilder thisWindow = KStreamImpl.createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store");
            StoreBuilder otherWindow = KStreamImpl.createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store");
            KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindow.name(), windows.beforeMs + windows.afterMs + 1L, windows.maintainMs());
            KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindow.name(), windows.beforeMs + windows.afterMs + 1L, windows.maintainMs());
            KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindow.name(), windows.beforeMs, windows.afterMs, joiner, this.leftOuter);
            KStreamKStreamJoin joinOther = new KStreamKStreamJoin(thisWindow.name(), windows.afterMs, windows.beforeMs, AbstractStream.reverseJoiner(joiner), this.rightOuter);
            KStreamPassThrough joinMerge = new KStreamPassThrough();
            KStreamImpl.this.builder.internalTopologyBuilder.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream)((Object)lhs)).name);
            KStreamImpl.this.builder.internalTopologyBuilder.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream)((Object)other)).name);
            KStreamImpl.this.builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, thisWindowStreamName);
            KStreamImpl.this.builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
            KStreamImpl.this.builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
            KStreamImpl.this.builder.internalTopologyBuilder.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
            KStreamImpl.this.builder.internalTopologyBuilder.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
            HashSet<String> allSourceNodes = new HashSet<String>(((AbstractStream)((Object)lhs)).sourceNodes);
            allSourceNodes.addAll(((KStreamImpl)other).sourceNodes);
            return new KStreamImpl(KStreamImpl.this.builder, joinMergeName, allSourceNodes, false);
        }
    }
}

