/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamForeach;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPassThrough;
import org.apache.kafka.streams.kstream.internals.KStreamReduce;
import org.apache.kafka.streams.kstream.internals.KStreamTransform;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowReduce;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KeyValuePrinter;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;

public class KStreamImpl<K, V>
extends AbstractStream<K>
implements KStream<K, V> {
    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    public static final String MERGE_NAME = "KSTREAM-MERGE-";
    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    public static final String SINK_NAME = "KSTREAM-SINK-";
    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";

    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
        super(topology, name, sourceNodes);
    }

    @Override
    public KStream<K, V> filter(Predicate<K, V> predicate) {
        String name = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(name, new KStreamFilter<K, V>(predicate, false), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<K, V> predicate) {
        String name = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(name, new KStreamFilter<K, V>(predicate, true), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
        String name = this.topology.newName(KEY_SELECT_NAME);
        this.topology.addProcessor(name, new KStreamMap(new KeyValueMapper<K, V, KeyValue<K1, V>>(){

            @Override
            public KeyValue<K1, V> apply(K key, V value) {
                return new KeyValue(mapper.apply(key, value), value);
            }
        }), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
        String name = this.topology.newName(MAP_NAME);
        this.topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, null);
    }

    @Override
    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
        String name = this.topology.newName(MAPVALUES_NAME);
        this.topology.addProcessor(name, new KStreamMapValues(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public void print() {
        this.print(null, null);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde) {
        String name = this.topology.newName(PRINTING_NAME);
        this.topology.addProcessor(name, new KeyValuePrinter(keySerde, valSerde), this.name);
    }

    @Override
    public void writeAsText(String filePath) {
        this.writeAsText(filePath, null, null);
    }

    @Override
    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
        String name = this.topology.newName(PRINTING_NAME);
        try {
            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
            this.topology.addProcessor(name, new KeyValuePrinter(printStream, keySerde, valSerde), this.name);
        }
        catch (FileNotFoundException e) {
            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
            throw new TopologyBuilderException(message);
        }
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
        String name = this.topology.newName(FLATMAP_NAME);
        this.topology.addProcessor(name, new KStreamFlatMap<K, V, K1, V1>(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, null);
    }

    @Override
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
        String name = this.topology.newName(FLATMAPVALUES_NAME);
        this.topology.addProcessor(name, new KStreamFlatMapValues(mapper), this.name);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public KStream<K, V>[] branch(Predicate<K, V> ... predicates) {
        String branchName = this.topology.newName(BRANCH_NAME);
        this.topology.addProcessor(branchName, new KStreamBranch((Predicate[])predicates.clone()), this.name);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            String childName = this.topology.newName(BRANCHCHILD_NAME);
            this.topology.addProcessor(childName, new KStreamPassThrough(), branchName);
            branchChildren[i] = new KStreamImpl<K, V>(this.topology, childName, this.sourceNodes);
        }
        return branchChildren;
    }

    public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K, V>[] streams) {
        String name = topology.newName(MERGE_NAME);
        String[] parentNames = new String[streams.length];
        HashSet<String> allSourceNodes = new HashSet<String>();
        for (int i = 0; i < streams.length; ++i) {
            KStreamImpl stream = (KStreamImpl)streams[i];
            parentNames[i] = stream.name;
            if (allSourceNodes == null) continue;
            if (stream.sourceNodes != null) {
                allSourceNodes.addAll(stream.sourceNodes);
                continue;
            }
            allSourceNodes = null;
        }
        topology.addProcessor(name, new KStreamPassThrough(), parentNames);
        return new KStreamImpl<K, V>(topology, name, allSourceNodes);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        this.to(keySerde, valSerde, partitioner, topic);
        return this.topology.stream(keySerde, valSerde, topic);
    }

    @Override
    public void foreach(ForeachAction<K, V> action) {
        String name = this.topology.newName(FOREACH_NAME);
        this.topology.addProcessor(name, new KStreamForeach<K, V>(action), this.name);
    }

    @Override
    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        return this.through(keySerde, valSerde, null, topic);
    }

    @Override
    public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
        return this.through(null, null, partitioner, topic);
    }

    @Override
    public KStream<K, V> through(String topic) {
        return this.through(null, null, null, topic);
    }

    @Override
    public void to(String topic) {
        this.to(null, null, null, topic);
    }

    @Override
    public void to(StreamPartitioner<K, V> partitioner, String topic) {
        this.to(null, null, partitioner, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        this.to(keySerde, valSerde, null, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        Serializer valSerializer;
        String name = this.topology.newName(SINK_NAME);
        Serializer keySerializer = keySerde == null ? null : keySerde.serializer();
        Serializer serializer = valSerializer = valSerde == null ? null : valSerde.serializer();
        if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
            WindowedSerializer windowedSerializer = (WindowedSerializer)keySerializer;
            partitioner = new WindowedStreamPartitioner(windowedSerializer);
        }
        this.topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
    }

    @Override
    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String ... stateStoreNames) {
        String name = this.topology.newName(TRANSFORM_NAME);
        this.topology.addProcessor(name, new KStreamTransform<K, V, K1, V1>(transformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
        return new KStreamImpl<K, V>(this.topology, name, null);
    }

    @Override
    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String ... stateStoreNames) {
        String name = this.topology.newName(TRANSFORMVALUES_NAME);
        this.topology.addProcessor(name, new KStreamTransformValues(valueTransformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
        return new KStreamImpl<K, V>(this.topology, name, this.sourceNodes);
    }

    @Override
    public void process(ProcessorSupplier<K, V> processorSupplier, String ... stateStoreNames) {
        String name = this.topology.newName(PROCESSOR_NAME);
        this.topology.addProcessor(name, processorSupplier, this.name);
        this.topology.connectProcessorAndStateStores(name, stateStoreNames);
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
    }

    @Override
    public <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, null, null, null, false);
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde) {
        return this.join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
    }

    @Override
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, null, null, null, true);
    }

    private <V1, R> KStream<K, R> join(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde, boolean outer) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this").withKeys(keySerde).withValues(thisValueSerde).persistent().windowed(windows.maintainMs(), windows.segments, true).build();
        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other").withKeys(keySerde).withValues(otherValueSerde).persistent().windowed(windows.maintainMs(), windows.segments, true).build();
        KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
        KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
        KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindow.name(), windows.before, windows.after, joiner, outer);
        KStreamKStreamJoin joinOther = new KStreamKStreamJoin(thisWindow.name(), windows.before, windows.after, KStreamImpl.reverseJoiner(joiner), outer);
        KStreamPassThrough joinMerge = new KStreamPassThrough();
        String thisWindowStreamName = this.topology.newName(WINDOWED_NAME);
        String otherWindowStreamName = this.topology.newName(WINDOWED_NAME);
        String joinThisName = outer ? this.topology.newName(OUTERTHIS_NAME) : this.topology.newName(JOINTHIS_NAME);
        String joinOtherName = outer ? this.topology.newName(OUTEROTHER_NAME) : this.topology.newName(JOINOTHER_NAME);
        String joinMergeName = this.topology.newName(MERGE_NAME);
        this.topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
        this.topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl)other).name);
        this.topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
        this.topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
        this.topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
        this.topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
        this.topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
        return new KStreamImpl<K, V>(this.topology, joinMergeName, allSourceNodes);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V1> otherValueSerde) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other").withKeys(keySerde).withValues(otherValueSerde).persistent().windowed(windows.maintainMs(), windows.segments, true).build();
        KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindow.name(), windows.before + windows.after + 1L, windows.maintainMs());
        KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindow.name(), windows.before, windows.after, joiner, true);
        String otherWindowStreamName = this.topology.newName(WINDOWED_NAME);
        String joinThisName = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl)other).name);
        this.topology.addProcessor(joinThisName, joinThis, this.name);
        this.topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
        return new KStreamImpl<K, V>(this.topology, joinThisName, allSourceNodes);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> other, ValueJoiner<V, V1, R> joiner, JoinWindows windows) {
        return this.leftJoin(other, joiner, windows, null, null);
    }

    @Override
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String name = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(name, new KStreamKTableLeftJoin((KTableImpl)other, joiner), this.name);
        this.topology.connectProcessors(this.name, ((KTableImpl)other).name);
        return new KStreamImpl<K, V>(this.topology, name, allSourceNodes);
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, Serde<K> keySerde, Serde<V> aggValueSerde) {
        String reduceName = this.topology.newName(REDUCE_NAME);
        KStreamWindowReduce reduceSupplier = new KStreamWindowReduce(windows, windows.name(), reducer);
        StateStoreSupplier reduceStore = Stores.create(windows.name()).withKeys(keySerde).withValues(aggValueSerde).persistent().windowed(windows.maintainMs(), windows.segments, false).build();
        this.topology.addProcessor(reduceName, reduceSupplier, this.name);
        this.topology.addStateStore(reduceStore, reduceName);
        return new KTableImpl(this.topology, reduceName, reduceSupplier, this.sourceNodes);
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows) {
        return this.reduceByKey(reducer, windows, null, null);
    }

    @Override
    public KTable<K, V> reduceByKey(Reducer<V> reducer, Serde<K> keySerde, Serde<V> aggValueSerde, String name) {
        String reduceName = this.topology.newName(REDUCE_NAME);
        KStreamReduce reduceSupplier = new KStreamReduce(name, reducer);
        StateStoreSupplier reduceStore = Stores.create(name).withKeys(keySerde).withValues(aggValueSerde).persistent().build();
        this.topology.addProcessor(reduceName, reduceSupplier, this.name);
        this.topology.addStateStore(reduceStore, reduceName);
        return new KTableImpl(this.topology, reduceName, reduceSupplier, this.sourceNodes);
    }

    @Override
    public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
        return this.reduceByKey(reducer, null, null, name);
    }

    @Override
    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, Serde<K> keySerde, Serde<T> aggValueSerde) {
        String aggregateName = this.topology.newName(AGGREGATE_NAME);
        KStreamWindowAggregate<K, V, T, W> aggregateSupplier = new KStreamWindowAggregate<K, V, T, W>(windows, windows.name(), initializer, aggregator);
        StateStoreSupplier aggregateStore = Stores.create(windows.name()).withKeys(keySerde).withValues(aggValueSerde).persistent().windowed(windows.maintainMs(), windows.segments, false).build();
        this.topology.addProcessor(aggregateName, aggregateSupplier, this.name);
        this.topology.addStateStore(aggregateStore, aggregateName);
        return new KTableImpl(this.topology, aggregateName, aggregateSupplier, this.sourceNodes);
    }

    @Override
    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows) {
        return this.aggregateByKey(initializer, aggregator, windows, null, null);
    }

    @Override
    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Serde<K> keySerde, Serde<T> aggValueSerde, String name) {
        String aggregateName = this.topology.newName(AGGREGATE_NAME);
        KStreamAggregate<K, V, T> aggregateSupplier = new KStreamAggregate<K, V, T>(name, initializer, aggregator);
        StateStoreSupplier aggregateStore = Stores.create(name).withKeys(keySerde).withValues(aggValueSerde).persistent().build();
        this.topology.addProcessor(aggregateName, aggregateSupplier, this.name);
        this.topology.addStateStore(aggregateStore, aggregateName);
        return new KTableImpl(this.topology, aggregateName, aggregateSupplier, this.sourceNodes);
    }

    @Override
    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, String name) {
        return this.aggregateByKey(initializer, aggregator, null, null, name);
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde) {
        return this.aggregateByKey(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, windows, keySerde, Serdes.Long());
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) {
        return this.countByKey(windows, null);
    }

    @Override
    public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
        return this.aggregateByKey(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, keySerde, Serdes.Long(), name);
    }

    @Override
    public KTable<K, Long> countByKey(String name) {
        return this.countByKey(null, name);
    }
}

