/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.RecordCollector;

public class SinkNode<K, V>
extends ProcessorNode<K, V> {
    private final String topic;
    private Serializer<K> keySerializer;
    private Serializer<V> valSerializer;
    private final StreamPartitioner<? super K, ? super V> partitioner;
    private ProcessorContext context;

    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
        super(name);
        this.topic = topic;
        this.keySerializer = keySerializer;
        this.valSerializer = valSerializer;
        this.partitioner = partitioner;
    }

    @Override
    public void addChild(ProcessorNode<?, ?> child) {
        throw new UnsupportedOperationException("sink node does not allow addChild");
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        this.context = context;
        if (this.keySerializer == null) {
            this.keySerializer = context.keySerde().serializer();
        }
        if (this.valSerializer == null) {
            this.valSerializer = context.valueSerde().serializer();
        }
        if (this.valSerializer instanceof ChangedSerializer && ((ChangedSerializer)this.valSerializer).inner() == null) {
            ((ChangedSerializer)this.valSerializer).setInner(context.valueSerde().serializer());
        }
    }

    @Override
    public void process(K key, V value) {
        RecordCollector collector = ((RecordCollector.Supplier)((Object)this.context)).recordCollector();
        long timestamp = this.context.timestamp();
        if (timestamp < 0L) {
            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
        }
        try {
            collector.send(this.topic, key, value, null, timestamp, this.keySerializer, this.valSerializer, this.partitioner);
        }
        catch (ClassCastException e) {
            throw new StreamsException(String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type (key type: %s / value type: %s). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.", this.keySerializer.getClass().getName(), this.valSerializer.getClass().getName(), key.getClass().getName(), value.getClass().getName()), e);
        }
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    @Override
    public String toString(String indent) {
        StringBuilder sb = new StringBuilder(super.toString(indent));
        sb.append(indent).append("\ttopic:\t\t");
        sb.append(this.topic);
        sb.append("\n");
        return sb.toString();
    }
}

