/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.NamedOperation;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.JoinedInternal;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues;
import org.apache.kafka.streams.kstream.internals.KStreamGlobalKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPassThrough;
import org.apache.kafka.streams.kstream.internals.KStreamPeek;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.SerializedInternal;
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal;
import org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;

public class KStreamImpl<K, V>
extends AbstractStream<K, V>
implements KStream<K, V> {
    static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    static final String JOIN_NAME = "KSTREAM-JOIN-";
    static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    static final String MERGE_NAME = "KSTREAM-MERGE-";
    static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private final boolean repartitionRequired;

    KStreamImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> sourceNodes, boolean repartitionRequired, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, builder);
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        return this.filter(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamFilter<K, V>(predicate, false), name);
        ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, filterProcessorNode, this.builder);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        return this.filterNot(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamFilter<K, V>(predicate, true), name);
        ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, filterNotProcessorNode, this.builder);
    }

    @Override
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
        return this.selectKey(mapper, NamedInternal.empty());
    }

    @Override
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        ProcessorGraphNode<K, V> selectKeyProcessorNode = this.internalSelectKey(mapper, new NamedInternal(named));
        selectKeyProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
        return new KStreamImpl<K, V>(selectKeyProcessorNode.nodeName(), null, this.valSerde, this.sourceNodes, true, selectKeyProcessorNode, this.builder);
    }

    private <KR> ProcessorGraphNode<K, V> internalSelectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, NamedInternal named) {
        String name = named.orElseGenerateWithPrefix(this.builder, KEY_SELECT_NAME);
        KStreamMap kStreamMap = new KStreamMap((key, value) -> new KeyValue(mapper.apply(key, value), value));
        ProcessorParameters processorParameters = new ProcessorParameters(kStreamMap, name);
        return new ProcessorGraphNode(name, processorParameters);
    }

    @Override
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
        return this.map(mapper, NamedInternal.empty());
    }

    @Override
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMap(mapper), name);
        ProcessorGraphNode mapProcessorNode = new ProcessorGraphNode(name, processorParameters);
        mapProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, mapProcessorNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper) {
        return this.mapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named) {
        return this.mapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
        return this.mapValues(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(mapper, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMapValues<K, V, VR>(mapper), name);
        ProcessorGraphNode mapValuesProcessorNode = new ProcessorGraphNode(name, processorParameters);
        mapValuesProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, mapValuesProcessorNode, this.builder);
    }

    @Override
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed can't be null");
        PrintedInternal<K, V> printedInternal = new PrintedInternal<K, V>(printed);
        String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(this.builder, PRINTING_NAME);
        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(printedInternal.build(this.name), name);
        ProcessorGraphNode<K, V> printNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, printNode);
    }

    @Override
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
        return this.flatMap(mapper, NamedInternal.empty());
    }

    @Override
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMap(mapper), name);
        ProcessorGraphNode flatMapNode = new ProcessorGraphNode(name, processorParameters);
        flatMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, flatMapNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, flatMapNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
        return this.flatMapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper, Named named) {
        return this.flatMapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
        return this.flatMapValues(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMapValues(mapper), name);
        ProcessorGraphNode flatMapValuesNode = new ProcessorGraphNode(name, processorParameters);
        flatMapValuesNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, flatMapValuesNode, this.builder);
    }

    @Override
    public KStream<K, V>[] branch(Predicate<? super K, ? super V> ... predicates) {
        return this.doBranch(NamedInternal.empty(), predicates);
    }

    @Override
    public KStream<K, V>[] branch(Named name, Predicate<? super K, ? super V> ... predicates) {
        Objects.requireNonNull(name, "name can't be null");
        return this.doBranch(new NamedInternal(name), predicates);
    }

    private KStream<K, V>[] doBranch(NamedInternal named, Predicate<? super K, ? super V> ... predicates) {
        if (predicates.length == 0) {
            throw new IllegalArgumentException("you must provide at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicates) {
            Objects.requireNonNull(predicate, "predicates can't have null values");
        }
        String branchName = named.orElseGenerateWithPrefix(this.builder, BRANCH_NAME);
        String[] childNames = new String[predicates.length];
        for (int i = 0; i < predicates.length; ++i) {
            childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, this.builder, BRANCHCHILD_NAME);
        }
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamBranch((Predicate[])predicates.clone(), childNames), branchName);
        ProcessorGraphNode branchNode = new ProcessorGraphNode(branchName, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, branchNode);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            ProcessorParameters innerProcessorParameters = new ProcessorParameters(new KStreamPassThrough(), childNames[i]);
            ProcessorGraphNode branchChildNode = new ProcessorGraphNode(childNames[i], innerProcessorParameters);
            this.builder.addGraphNode(branchNode, branchChildNode);
            branchChildren[i] = new KStreamImpl<K, V>(childNames[i], this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, branchChildNode, this.builder);
        }
        return branchChildren;
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream) {
        Objects.requireNonNull(stream);
        return this.merge(this.builder, stream, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream, Named processorName) {
        Objects.requireNonNull(stream);
        return this.merge(this.builder, stream, new NamedInternal(processorName));
    }

    private KStream<K, V> merge(InternalStreamsBuilder builder, KStream<K, V> stream, NamedInternal processorName) {
        KStreamImpl streamImpl = (KStreamImpl)stream;
        String name = processorName.orElseGenerateWithPrefix(builder, MERGE_NAME);
        HashSet<String> allSourceNodes = new HashSet<String>();
        boolean requireRepartitioning = streamImpl.repartitionRequired || this.repartitionRequired;
        allSourceNodes.addAll(this.sourceNodes);
        allSourceNodes.addAll(streamImpl.sourceNodes);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamPassThrough(), name);
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(name, processorParameters);
        mergeNode.setMergeNode(true);
        builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
        return new KStreamImpl<K, V>(name, null, null, allSourceNodes, requireRepartitioning, mergeNode, builder);
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        this.foreach(action, NamedInternal.empty());
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FOREACH_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamPeek<K, V>(action, false), name);
        ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, foreachNode);
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action) {
        return this.peek(action, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, PEEK_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamPeek<K, V>(action, true), name);
        ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.streamsGraphNode, peekNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, peekNode, this.builder);
    }

    @Override
    public KStream<K, V> through(String topic) {
        return this.through(topic, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public KStream<K, V> through(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to(topic, producedInternal);
        return this.builder.stream(Collections.singleton(topic), new ConsumedInternal<K, V>(producedInternal.keySerde(), producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null));
    }

    @Override
    public void to(String topic) {
        this.to(topic, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public void to(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to((TopicNameExtractor<K, V>)new StaticTopicNameExtractor(topic), producedInternal);
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor) {
        this.to(topicExtractor, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced) {
        Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to(topicExtractor, producedInternal);
    }

    @Override
    private void to(TopicNameExtractor<K, V> topicExtractor, ProducedInternal<K, V> produced) {
        String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(this.builder, SINK_NAME);
        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<K, V>(name, topicExtractor, produced);
        this.builder.addGraphNode(this.streamsGraphNode, sinkNode);
    }

    @Override
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        return this.flatTransform(new TransformerSupplierAdapter<K, V, KR, VR>(transformerSupplier), Named.as(name), stateStoreNames);
    }

    @Override
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return this.flatTransform(new TransformerSupplierAdapter<K, V, KR, VR>(transformerSupplier), named, stateStoreNames);
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        return this.flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).name();
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamFlatTransform<K, V, K1, V1>(transformerSupplier), name), stateStoreNames);
        transformNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, transformNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, transformNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return this.doTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), new NamedInternal(named), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return this.doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames);
    }

    private <VR> KStream<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, NamedInternal named, String ... stateStoreNames) {
        String name = named.orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamTransformValues<K, V, VR>(valueTransformerWithKeySupplier), name), stateStoreNames);
        transformNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, transformNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, transformNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), named, stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
    }

    private <VR> KStream<K, VR> doFlatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, Named named, String ... stateStoreNames) {
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamFlatTransformValues<K, V, VR>(valueTransformerWithKeySupplier), name), stateStoreNames);
        transformNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, transformNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, transformNode, this.builder);
    }

    @Override
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
        String name = this.builder.newProcessorName(PROCESSOR_NAME);
        this.process(processorSupplier, Named.as(name), stateStoreNames);
    }

    @Override
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
        Objects.requireNonNull(named, "named cant' be null");
        String name = new NamedInternal(named).name();
        StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters<K, V>(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.streamsGraphNode, processNode);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    @Deprecated
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "Joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        NamedOperation streamJoined = StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName(joinedInternal.name());
        return this.join(otherStream, joiner, windows, (StreamJoined<K, V, VO>)streamJoined);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.outerJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    @Deprecated
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "Joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        NamedOperation streamJoined = StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName(joinedInternal.name());
        return this.outerJoin(other, joiner, windows, (StreamJoined<K, V, VO>)streamJoined);
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(other, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, true));
    }

    private <VO, VR> KStream<K, VR> doJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined, KStreamImplJoin join) {
        Objects.requireNonNull(other, "other KStream can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(streamJoined, "streamJoined can't be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>)other;
        StreamJoinedInternal<K, V, VO> streamJoinedInternal = new StreamJoinedInternal<K, V, VO>(streamJoined);
        NamedInternal name = new NamedInternal(streamJoinedInternal.name());
        if (joinThis.repartitionRequired) {
            String joinThisName = joinThis.name;
            String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
            joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (joinOther.repartitionRequired) {
            String joinOtherName = joinOther.name;
            String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
            joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        joinThis.ensureJoinableWith(joinOther);
        return join.join(joinThis, joinOther, joiner, windows, streamJoined);
    }

    private KStreamImpl<K, V> repartitionForJoin(String repartitionName, Serde<K> keySerdeOverride, Serde<V> valueSerdeOverride) {
        Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : this.keySerde;
        Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : this.valSerde;
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String repartitionedSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionKeySerde, repartitionValueSerde, repartitionName, optimizableRepartitionNodeBuilder);
        OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
        this.builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode);
        return new KStreamImpl<K, V>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode, this.builder);
    }

    static <K1, V1> String createRepartitionedSource(InternalStreamsBuilder builder, Serde<K1> keySerde, Serde<V1> valSerde, String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
        String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
        String sinkName = builder.newProcessorName(SINK_NAME);
        String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
        String sourceName = builder.newProcessorName(SOURCE_NAME);
        Predicate<Object, Object> notNullKeyPredicate = (k, v) -> k != null;
        ProcessorParameters<Object, Object> processorParameters = new ProcessorParameters<Object, Object>(new KStreamFilter<Object, Object>(notNullKeyPredicate, false), nullKeyFilterProcessorName);
        optimizableRepartitionNodeBuilder.withKeySerde(keySerde).withValueSerde(valSerde).withSourceName(sourceName).withRepartitionTopic(repartitionTopic).withSinkName(sinkName).withProcessorParameters(processorParameters).withNodeName(sourceName);
        return sourceName;
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.leftJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    @Deprecated
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "Joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        NamedOperation streamJoined = StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName(joinedInternal.name());
        return this.leftJoin(other, joiner, windows, (StreamJoined<K, V, VO>)streamJoined);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(other, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.join(other, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        String name = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(name != null ? name : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, joined, false);
        }
        return this.doStreamTableJoin(other, joiner, joined, false);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.leftJoin(other, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        String internalName = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(internalName != null ? internalName : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, joined, true);
        }
        return this.doStreamTableJoin(other, joiner, joined, true);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, false, NamedInternal.empty());
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, false, named);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, true, NamedInternal.empty());
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, true, named);
    }

    private <KG, VG, VR> KStream<K, VR> globalTableJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, boolean leftJoin, Named named) {
        Objects.requireNonNull(globalTable, "globalTable can't be null");
        Objects.requireNonNull(keyMapper, "keyMapper can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(named, "named can't be null");
        KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl)globalTable).valueGetterSupplier();
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, LEFTJOIN_NAME);
        KStreamGlobalKTableJoin<? super K, ? extends KG, ? extends VR, ? super V, ? super VG> processorSupplier = new KStreamGlobalKTableJoin<K, KG, VR, V, VG>(valueGetterSupplier, joiner, keyMapper, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, new String[0], null);
        this.builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, streamTableJoinNode, this.builder);
    }

    private <VO, VR> KStream<K, VR> doStreamTableJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined, boolean leftJoin) {
        Objects.requireNonNull(other, "other KTable can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        NamedInternal renamed = new NamedInternal(joinedInternal.name());
        String name = renamed.orElseGenerateWithPrefix(this.builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
        KStreamKTableJoin processorSupplier = new KStreamKTableJoin(((KTableImpl)other).valueGetterSupplier(), joiner, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, ((KTableImpl)other).valueGetterSupplier().storeNames(), this.name);
        this.builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, joined.keySerde() != null ? joined.keySerde() : this.keySerde, null, allSourceNodes, false, streamTableJoinNode, this.builder);
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector) {
        return this.groupBy(selector, Grouped.with(null, this.valSerde));
    }

    @Override
    @Deprecated
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector, Serialized<KR, V> serialized) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal<KR, V> serializedInternal = new SerializedInternal<KR, V>(serialized);
        return this.groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector, Grouped<KR, V> grouped) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal<KR, V> groupedInternal = new GroupedInternal<KR, V>(grouped);
        ProcessorGraphNode<K, V> selectKeyMapNode = this.internalSelectKey(selector, new NamedInternal(groupedInternal.name()));
        selectKeyMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
        return new KGroupedStreamImpl<KR, V>(selectKeyMapNode.nodeName(), this.sourceNodes, groupedInternal, true, selectKeyMapNode, this.builder);
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(Grouped.with(this.keySerde, this.valSerde));
    }

    @Override
    @Deprecated
    public KGroupedStream<K, V> groupByKey(Serialized<K, V> serialized) {
        SerializedInternal<K, V> serializedInternal = new SerializedInternal<K, V>(serialized);
        return this.groupByKey(Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped) {
        GroupedInternal<K, V> groupedInternal = new GroupedInternal<K, V>(grouped);
        return new KGroupedStreamImpl<K, V>(this.name, this.sourceNodes, groupedInternal, this.repartitionRequired, this.streamsGraphNode, this.builder);
    }
}

