/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KGroupedTableImpl;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
import org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin;
import org.apache.kafka.streams.kstream.internals.KTableMapValues;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableRepartitionMap;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KTableTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.SerializedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;

public class KTableImpl<K, S, V>
extends AbstractStream<K, V>
implements KTable<K, V> {
    static final String SOURCE_NAME = "KTABLE-SOURCE-";
    static final String STATE_STORE_NAME = "STATE-STORE-";
    private static final String FILTER_NAME = "KTABLE-FILTER-";
    private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
    private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
    private static final String MERGE_NAME = "KTABLE-MERGE-";
    private static final String SELECT_NAME = "KTABLE-SELECT-";
    private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
    private final ProcessorSupplier<?, ?> processorSupplier;
    private final String queryableStoreName;
    private final boolean isQueryable;
    private boolean sendOldValues = false;

    public KTableImpl(String name, Serde<K> keySerde, Serde<V> valSerde, Set<String> sourceNodes, String queryableStoreName, boolean isQueryable, ProcessorSupplier<?, ?> processorSupplier, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
        this.processorSupplier = processorSupplier;
        this.queryableStoreName = queryableStoreName;
        this.isQueryable = isQueryable;
    }

    @Override
    public String queryableStoreName() {
        if (!this.isQueryable) {
            return null;
        }
        return this.queryableStoreName;
    }

    private KTable<K, V> doFilter(Predicate<? super K, ? super V> predicate, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean filterNot) {
        String name = this.builder.newProcessorName(FILTER_NAME);
        boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
        KTableFilter<? super K, ? super V> processorSupplier = new KTableFilter<K, V>(this, predicate, filterNot, shouldMaterialize ? materializedInternal.storeName() : null);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters<K, V>(processorSupplier, name));
        TableProcessorNode tableNode = new TableProcessorNode(name, processorParameters, materializedInternal, null);
        this.builder.addGraphNode(this.streamsGraphNode, tableNode);
        return new KTableImpl<K, S, V>(name, materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde, materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde, this.sourceNodes, shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName, shouldMaterialize, processorSupplier, tableNode, this.builder);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, null, false);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, FILTER_NAME);
        return this.doFilter(predicate, materializedInternal, false);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, null, true);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, FILTER_NAME);
        return this.doFilter(predicate, materializedInternal, true);
    }

    private <VR> KTable<K, VR> doMapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        String name = this.builder.newProcessorName(MAPVALUES_NAME);
        boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
        KTableMapValues<? super K, ? super V, ? extends VR> processorSupplier = new KTableMapValues<K, V, VR>(this, mapper, shouldMaterialize ? materializedInternal.storeName() : null);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(processorSupplier, name));
        TableProcessorNode tableNode = new TableProcessorNode(name, processorParameters, materializedInternal, null);
        this.builder.addGraphNode(this.streamsGraphNode, tableNode);
        return new KTableImpl<K, S, VR>(name, materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde, materializedInternal != null ? materializedInternal.valueSerde() : null, this.sourceNodes, shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName, shouldMaterialize, processorSupplier, tableNode, this.builder);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(KTableImpl.withKey(mapper), null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(mapper, null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, MAPVALUES_NAME);
        return this.doMapValues(KTableImpl.withKey(mapper), materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, MAPVALUES_NAME);
        return this.doMapValues(mapper, materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, String ... stateStoreNames) {
        return this.doTransformValues(transformerSupplier, null, stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, String ... stateStoreNames) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, TRANSFORMVALUES_NAME);
        return this.doTransformValues(transformerSupplier, materializedInternal, stateStoreNames);
    }

    private <VR> KTable<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized, String ... stateStoreNames) {
        Objects.requireNonNull(stateStoreNames, "stateStoreNames");
        String name = this.builder.newProcessorName(TRANSFORMVALUES_NAME);
        boolean shouldMaterialize = materialized != null && materialized.isQueryable();
        KTableTransformValues<? super K, ? super V, ? extends VR> processorSupplier = new KTableTransformValues<K, V, VR>(this, transformerSupplier, shouldMaterialize ? materialized.storeName() : null);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(processorSupplier, name));
        TableProcessorNode tableNode = new TableProcessorNode(name, processorParameters, materialized, stateStoreNames);
        this.builder.addGraphNode(this.streamsGraphNode, tableNode);
        return new KTableImpl<K, S, VR>(name, materialized != null && materialized.keySerde() != null ? materialized.keySerde() : this.keySerde, materialized != null ? materialized.valueSerde() : null, this.sourceNodes, shouldMaterialize ? materialized.storeName() : this.queryableStoreName, shouldMaterialize, processorSupplier, tableNode, this.builder);
    }

    @Override
    public KStream<K, V> toStream() {
        String name = this.builder.newProcessorName(TOSTREAM_NAME);
        KStreamMapValues<Object, Change, Object> kStreamMapValues = new KStreamMapValues<Object, Change, Object>((key, change) -> change.newValue);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(kStreamMapValues, name));
        ProcessorGraphNode toStreamNode = new ProcessorGraphNode(name, processorParameters, false);
        this.builder.addGraphNode(this.streamsGraphNode, toStreamNode);
        return new KStreamImpl(name, this.keySerde, this.valSerde, this.sourceNodes, false, toStreamNode, this.builder);
    }

    @Override
    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
        return this.toStream().selectKey(mapper);
    }

    @Override
    public KTable<K, V> suppress(Suppressed<? super K> suppressed) {
        SuppressedInternal<K> suppressedInternal = this.buildSuppress(suppressed);
        String name = suppressedInternal.name() != null ? suppressedInternal.name() : this.builder.newProcessorName(SUPPRESS_NAME);
        String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : this.builder.newStoreName(SUPPRESS_NAME);
        ProcessorSupplier suppressionSupplier = () -> new KTableSuppressProcessor(suppressedInternal, storeName, this.keySerde, this.valSerde == null ? null : new FullChangeSerde(this.valSerde));
        StatefulProcessorNode node = new StatefulProcessorNode(name, new ProcessorParameters(suppressionSupplier, name), new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), false);
        this.builder.addGraphNode(this.streamsGraphNode, node);
        return new KTableImpl<K, S, V>(name, this.keySerde, this.valSerde, Collections.singleton(this.name), null, false, suppressionSupplier, node, this.builder);
    }

    private SuppressedInternal<K> buildSuppress(Suppressed<? super K> suppress) {
        if (suppress instanceof FinalResultsSuppressionBuilder) {
            long grace = GraphGraceSearchUtil.findAndVerifyWindowGrace(this.streamsGraphNode);
            FinalResultsSuppressionBuilder builder = (FinalResultsSuppressionBuilder)suppress;
            SuppressedInternal finalResultsSuppression = builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
            return finalResultsSuppression;
        }
        if (suppress instanceof SuppressedInternal) {
            return (SuppressedInternal)suppress;
        }
        throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
    }

    @Override
    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.doJoin(other, joiner, null, false, false);
    }

    @Override
    public <VO, VR> KTable<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, materializedInternal, false, false);
    }

    @Override
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.doJoin(other, joiner, null, true, true);
    }

    @Override
    public <VO, VR> KTable<K, VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, materializedInternal, true, true);
    }

    @Override
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.doJoin(other, joiner, null, true, false);
    }

    @Override
    public <VO, VR> KTable<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, materializedInternal, true, false);
    }

    private <VO, VR> KTable<K, VR> doJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean leftOuter, boolean rightOuter) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
        String joinMergeName = this.builder.newProcessorName(MERGE_NAME);
        return this.buildJoin((AbstractStream)((Object)other), joiner, leftOuter, rightOuter, joinMergeName, internalQueryableName, materializedInternal);
    }

    private <V1, R> KTable<K, R> buildJoin(AbstractStream<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, boolean leftOuter, boolean rightOuter, String joinMergeName, String internalQueryableName, MaterializedInternal materializedInternal) {
        KTableKTableAbstractJoin joinOther;
        KTableKTableAbstractJoin joinThis;
        Set<String> allSourceNodes = this.ensureJoinableWith(other);
        if (leftOuter) {
            this.enableSendingOldValues();
        }
        if (rightOuter) {
            ((KTableImpl)other).enableSendingOldValues();
        }
        String joinThisName = this.builder.newProcessorName(JOINTHIS_NAME);
        String joinOtherName = this.builder.newProcessorName(JOINOTHER_NAME);
        if (!leftOuter) {
            joinThis = new KTableKTableInnerJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableInnerJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        } else if (!rightOuter) {
            joinThis = new KTableKTableLeftJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableRightJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        } else {
            joinThis = new KTableKTableOuterJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableOuterJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        }
        KTableKTableJoinMerger joinMerge = new KTableKTableJoinMerger(joinThis, joinOther, internalQueryableName);
        KTableKTableJoinNode.KTableKTableJoinNodeBuilder kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
        if (materializedInternal != null) {
            kTableJoinNodeBuilder.withMaterializedInternal(materializedInternal);
        }
        kTableJoinNodeBuilder.withNodeName(joinMergeName);
        ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
        ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
        ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
        kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters).withJoinOtherProcessorParameters(joinOtherProcessorParameters).withJoinThisProcessorParameters(joinThisProcessorParameters).withJoinThisStoreNames(this.valueGetterSupplier().storeNames()).withJoinOtherStoreNames(((KTableImpl)other).valueGetterSupplier().storeNames()).withOtherJoinSideNodeName(((KTableImpl)other).name).withThisJoinSideNodeName(this.name);
        KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
        this.builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
        return new KTableImpl(joinMergeName, materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde, materializedInternal != null ? materializedInternal.valueSerde() : null, allSourceNodes, internalQueryableName, internalQueryableName != null, joinMerge, kTableKTableJoinNode, this.builder);
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
        return this.groupBy(selector, Grouped.with(null, null));
    }

    @Override
    @Deprecated
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, Serialized<K1, V1> serialized) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<K1, V1>(serialized);
        return this.groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, Grouped<K1, V1> grouped) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        String selectName = this.builder.newProcessorName(SELECT_NAME);
        KTableRepartitionMap<? super K, ? super V, K1, V1> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
        ProcessorParameters processorParameters = new ProcessorParameters(selectSupplier, selectName);
        ProcessorGraphNode groupByMapNode = new ProcessorGraphNode(selectName, processorParameters, false);
        this.builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
        this.enableSendingOldValues();
        GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<K1, V1>(grouped);
        return new KGroupedTableImpl<K1, V1>(this.builder, selectName, this.sourceNodes, groupedInternal, groupByMapNode);
    }

    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
        if (this.processorSupplier instanceof KTableSource) {
            KTableSource source = (KTableSource)this.processorSupplier;
            return new KTableSourceValueGetterSupplier(source.storeName);
        }
        if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
            return ((KStreamAggProcessorSupplier)this.processorSupplier).view();
        }
        return ((KTableProcessorSupplier)this.processorSupplier).view();
    }

    void enableSendingOldValues() {
        if (!this.sendOldValues) {
            if (this.processorSupplier instanceof KTableSource) {
                KTableSource source = (KTableSource)this.processorSupplier;
                source.enableSendingOldValues();
            } else if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
                ((KStreamAggProcessorSupplier)this.processorSupplier).enableSendingOldValues();
            } else {
                ((KTableProcessorSupplier)this.processorSupplier).enableSendingOldValues();
            }
            this.sendOldValues = true;
        }
    }

    boolean sendingOldValueEnabled() {
        return this.sendOldValues;
    }

    private <VR> ProcessorParameters<K, VR> unsafeCastProcessorParametersToCompletelyDifferentType(ProcessorParameters<K, Change<V>> kObjectProcessorParameters) {
        return kObjectProcessorParameters;
    }
}

