/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Objects;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KTableAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableReduce;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;

public class KGroupedTableImpl<K, V>
extends AbstractStream<K>
implements KGroupedTable<K, V> {
    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
    protected final Serde<K> keySerde;
    protected final Serde<V> valSerde;
    private boolean isQueryable;
    private final Initializer<Long> countInitializer = new Initializer<Long>(){

        @Override
        public Long apply() {
            return 0L;
        }
    };
    private final Aggregator<K, V, Long> countAdder = new Aggregator<K, V, Long>(){

        @Override
        public Long apply(K aggKey, V value, Long aggregate) {
            return aggregate + 1L;
        }
    };
    private Aggregator<K, V, Long> countSubtractor = new Aggregator<K, V, Long>(){

        @Override
        public Long apply(K aggKey, V value, Long aggregate) {
            return aggregate - 1L;
        }
    };

    KGroupedTableImpl(InternalStreamsBuilder builder, String name, String sourceName, Serde<K> keySerde, Serde<V> valSerde) {
        super(builder, name, Collections.singleton(sourceName));
        this.keySerde = keySerde;
        this.valSerde = valSerde;
        this.isQueryable = true;
    }

    private void determineIsQueryable(String queryableStoreName) {
        if (queryableStoreName == null) {
            this.isQueryable = false;
        }
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor, Serde<T> aggValueSerde, String queryableStoreName) {
        this.determineIsQueryable(queryableStoreName);
        return this.aggregate(initializer, adder, subtractor, KGroupedTableImpl.keyValueStore(this.keySerde, aggValueSerde, this.getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor, Serde<T> aggValueSerde) {
        return this.aggregate(initializer, adder, subtractor, aggValueSerde, (String)null);
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor, String queryableStoreName) {
        this.determineIsQueryable(queryableStoreName);
        return this.aggregate(initializer, adder, subtractor, (Serde<T>)null, this.getOrCreateName(queryableStoreName, AGGREGATE_NAME));
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor) {
        return this.aggregate(initializer, adder, subtractor, (String)null);
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, Aggregator<? super K, ? super V, T> subtractor, StateStoreSupplier<KeyValueStore> storeSupplier) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
        KTableAggregate<? super K, ? super V, T> aggregateSupplier = new KTableAggregate<K, V, T>(storeSupplier.name(), initializer, adder, subtractor);
        return this.doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier);
    }

    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, String functionName, StateStoreSupplier<KeyValueStore> storeSupplier) {
        String sinkName = this.builder.newProcessorName("KSTREAM-SINK-");
        String sourceName = this.builder.newProcessorName("KSTREAM-SOURCE-");
        String funcName = this.builder.newProcessorName(functionName);
        this.buildAggregate(aggregateSupplier, storeSupplier.name() + "-repartition", funcName, sourceName, sinkName);
        this.builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
        return new KTableImpl(this.builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), this.isQueryable);
    }

    private void buildAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, String topic, String funcName, String sourceName, String sinkName) {
        Serializer<K> keySerializer = this.keySerde == null ? null : this.keySerde.serializer();
        Deserializer<K> keyDeserializer = this.keySerde == null ? null : this.keySerde.deserializer();
        Serializer<V> valueSerializer = this.valSerde == null ? null : this.valSerde.serializer();
        Deserializer<V> valueDeserializer = this.valSerde == null ? null : this.valSerde.deserializer();
        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<V>(valueSerializer);
        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<V>(valueDeserializer);
        this.builder.internalTopologyBuilder.addInternalTopic(topic);
        this.builder.internalTopologyBuilder.addSink(sinkName, topic, keySerializer, changedValueSerializer, null, this.name);
        this.builder.internalTopologyBuilder.addSource(null, sourceName, (TimestampExtractor)new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
        this.builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
    }

    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, String functionName, MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
        String sinkName = this.builder.newProcessorName("KSTREAM-SINK-");
        String sourceName = this.builder.newProcessorName("KSTREAM-SOURCE-");
        String funcName = this.builder.newProcessorName(functionName);
        this.buildAggregate(aggregateSupplier, materialized.storeName() + "-repartition", funcName, sourceName, sinkName);
        this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<K, T>(materialized).materialize(), funcName);
        return new KTableImpl(this.builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), this.isQueryable);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, String queryableStoreName) {
        this.determineIsQueryable(queryableStoreName);
        return this.reduce(adder, subtractor, KGroupedTableImpl.keyValueStore(this.keySerde, this.valSerde, this.getOrCreateName(queryableStoreName, REDUCE_NAME)));
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, REDUCE_NAME);
        KTableReduce aggregateSupplier = new KTableReduce(materializedInternal.storeName(), adder, subtractor);
        return this.doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor) {
        return this.reduce(adder, subtractor, (String)null);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, StateStoreSupplier<KeyValueStore> storeSupplier) {
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
        KTableReduce aggregateSupplier = new KTableReduce(storeSupplier.name(), adder, subtractor);
        return this.doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier);
    }

    @Override
    public KTable<K, Long> count(String queryableStoreName) {
        this.determineIsQueryable(queryableStoreName);
        return this.count(KGroupedTableImpl.keyValueStore(this.keySerde, Serdes.Long(), this.getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
    }

    @Override
    public KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.aggregate(this.countInitializer, this.countAdder, this.countSubtractor, materialized);
    }

    @Override
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder, Aggregator<? super K, ? super V, VR> subtractor, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, AGGREGATE_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        KTableAggregate<? super K, ? super V, VR> aggregateSupplier = new KTableAggregate<K, V, VR>(materializedInternal.storeName(), initializer, adder, subtractor);
        return this.doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
    }

    @Override
    public KTable<K, Long> count() {
        return this.count((String)null);
    }

    @Override
    public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> storeSupplier) {
        return this.aggregate((Initializer<T>)this.countInitializer, (Aggregator<? super K, ? super V, T>)this.countAdder, (Aggregator<? super K, ? super V, T>)this.countSubtractor, storeSupplier);
    }
}

