/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Objects;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.KTableAggregate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableReduce;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;

public class KGroupedTableImpl<K, V>
extends AbstractStream<K>
implements KGroupedTable<K, V> {
    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
    protected final Serde<K> keySerde;
    protected final Serde<V> valSerde;

    public KGroupedTableImpl(KStreamBuilder topology, String name, String sourceName, Serde<K> keySerde, Serde<V> valSerde) {
        super(topology, name, Collections.singleton(sourceName));
        this.keySerde = keySerde;
        this.valSerde = valSerde;
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> subtractor, Serde<T> aggValueSerde, String storeName) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        KTableAggregate<K, V, T> aggregateSupplier = new KTableAggregate<K, V, T>(storeName, initializer, adder, subtractor);
        return this.doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> subtractor, String storeName) {
        return this.aggregate(initializer, adder, subtractor, null, storeName);
    }

    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier, Serde<T> aggValueSerde, String functionName, String storeName) {
        String sinkName = this.topology.newName("KSTREAM-SINK-");
        String sourceName = this.topology.newName("KSTREAM-SOURCE-");
        String funcName = this.topology.newName(functionName);
        String topic = storeName + "-repartition";
        Serializer keySerializer = this.keySerde == null ? null : this.keySerde.serializer();
        Deserializer keyDeserializer = this.keySerde == null ? null : this.keySerde.deserializer();
        Serializer valueSerializer = this.valSerde == null ? null : this.valSerde.serializer();
        Deserializer valueDeserializer = this.valSerde == null ? null : this.valSerde.deserializer();
        ChangedSerializer changedValueSerializer = new ChangedSerializer(valueSerializer);
        ChangedDeserializer changedValueDeserializer = new ChangedDeserializer(valueDeserializer);
        StateStoreSupplier aggregateStore = Stores.create(storeName).withKeys(this.keySerde).withValues(aggValueSerde).persistent().enableCaching().build();
        this.topology.addInternalTopic(topic);
        this.topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
        this.topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
        this.topology.addProcessor(funcName, aggregateSupplier, sourceName);
        this.topology.addStateStore(aggregateStore, funcName);
        return new KTableImpl(this.topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeName);
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, String storeName) {
        Objects.requireNonNull(adder, "adder can't be null");
        Objects.requireNonNull(subtractor, "subtractor can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        KTableReduce aggregateSupplier = new KTableReduce(storeName, adder, subtractor);
        return this.doAggregate(aggregateSupplier, this.valSerde, REDUCE_NAME, storeName);
    }

    @Override
    public KTable<K, Long> count(String storeName) {
        return this.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate - 1L;
            }
        }, Serdes.Long(), storeName);
    }
}

