/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

public class ProcessorNode<K, V> {
    private final List<ProcessorNode<?, ?>> children;
    private final Map<String, ProcessorNode<?, ?>> childByName;
    private NodeMetrics nodeMetrics;
    private final Processor<K, V> processor;
    private final String name;
    private final Time time;
    private K key;
    private V value;
    private final Runnable processDelegate = new Runnable(){

        @Override
        public void run() {
            ProcessorNode.this.processor.process(ProcessorNode.this.key, ProcessorNode.this.value);
        }
    };
    private ProcessorContext context;
    private final Runnable initDelegate = new Runnable(){

        @Override
        public void run() {
            if (ProcessorNode.this.processor != null) {
                ProcessorNode.this.processor.init(ProcessorNode.this.context);
            }
        }
    };
    private final Runnable closeDelegate = new Runnable(){

        @Override
        public void run() {
            if (ProcessorNode.this.processor != null) {
                ProcessorNode.this.processor.close();
            }
        }
    };
    public final Set<String> stateStores;

    public ProcessorNode(String name) {
        this(name, null, null);
    }

    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
        this.name = name;
        this.processor = processor;
        this.children = new ArrayList();
        this.childByName = new HashMap();
        this.stateStores = stateStores;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<K, V> processor() {
        return this.processor;
    }

    public final List<ProcessorNode<?, ?>> children() {
        return this.children;
    }

    final ProcessorNode getChild(String childName) {
        return this.childByName.get(childName);
    }

    public void addChild(ProcessorNode<?, ?> child) {
        this.children.add(child);
        this.childByName.put(child.name, child);
    }

    public void init(InternalProcessorContext context) {
        this.context = context;
        try {
            this.nodeMetrics = new NodeMetrics(context.metrics(), this.name, context);
            ProcessorNode.runAndMeasureLatency(this.time, this.initDelegate, this.nodeMetrics.nodeCreationSensor);
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    public void close() {
        try {
            ProcessorNode.runAndMeasureLatency(this.time, this.closeDelegate, this.nodeMetrics.nodeDestructionSensor);
            this.nodeMetrics.removeAllSensors();
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    public void process(K key, V value) {
        this.key = key;
        this.value = value;
        ProcessorNode.runAndMeasureLatency(this.time, this.processDelegate, this.nodeMetrics.nodeProcessTimeSensor);
    }

    public void punctuate(final long timestamp, final Punctuator punctuator) {
        Runnable punctuateDelegate = new Runnable(){

            @Override
            public void run() {
                punctuator.punctuate(timestamp);
            }
        };
        ProcessorNode.runAndMeasureLatency(this.time, punctuateDelegate, this.nodeMetrics.nodePunctuateTimeSensor);
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder(indent + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(indent).append("\tstates:\t\t[");
            for (String store : this.stateStores) {
                sb.append(store);
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    Sensor sourceNodeForwardSensor() {
        return this.nodeMetrics.sourceNodeForwardSensor;
    }

    private static void runAndMeasureLatency(Time time, Runnable action, Sensor sensor) {
        long startNs = -1L;
        if (sensor.shouldRecord()) {
            startNs = time.nanoseconds();
        }
        action.run();
        if (startNs != -1L) {
            sensor.record((double)(time.nanoseconds() - startNs));
        }
    }

    private static final class NodeMetrics {
        private final StreamsMetricsImpl metrics;
        private final Sensor nodeProcessTimeSensor;
        private final Sensor nodePunctuateTimeSensor;
        private final Sensor sourceNodeForwardSensor;
        private final Sensor nodeCreationSensor;
        private final Sensor nodeDestructionSensor;

        private NodeMetrics(StreamsMetricsImpl metrics, String processorNodeName, ProcessorContext context) {
            this.metrics = metrics;
            this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(context.taskId().toString(), "processor-node", processorNodeName, "process", Sensor.RecordingLevel.DEBUG, "task-id", context.taskId().toString());
            this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(context.taskId().toString(), "processor-node", processorNodeName, "punctuate", Sensor.RecordingLevel.DEBUG, "task-id", context.taskId().toString());
            this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(context.taskId().toString(), "processor-node", processorNodeName, "create", Sensor.RecordingLevel.DEBUG, "task-id", context.taskId().toString());
            this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(context.taskId().toString(), "processor-node", processorNodeName, "destroy", Sensor.RecordingLevel.DEBUG, "task-id", context.taskId().toString());
            this.sourceNodeForwardSensor = metrics.addThroughputSensor(context.taskId().toString(), "processor-node", processorNodeName, "forward", Sensor.RecordingLevel.DEBUG, "task-id", context.taskId().toString());
        }

        private void removeAllSensors() {
            this.metrics.removeSensor(this.nodeProcessTimeSensor);
            this.metrics.removeSensor(this.nodePunctuateTimeSensor);
            this.metrics.removeSensor(this.sourceNodeForwardSensor);
            this.metrics.removeSensor(this.nodeCreationSensor);
            this.metrics.removeSensor(this.nodeDestructionSensor);
        }
    }
}

