/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Assert;
import org.junit.Test;

public class ProcessorNodeTest {
    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
        node.init(null);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
        node.close();
    }

    private void testSpecificMetrics(Metrics metrics, String groupName, String opName, Map<String, String> metricTags) {
        Assert.assertNotNull((Object)metrics.metrics().get(metrics.metricName(opName + "-latency-avg", groupName, "The average latency of " + opName + " operation.", metricTags)));
        Assert.assertNotNull((Object)metrics.metrics().get(metrics.metricName(opName + "-latency-max", groupName, "The max latency of " + opName + " operation.", metricTags)));
        Assert.assertNotNull((Object)metrics.metrics().get(metrics.metricName(opName + "-rate", groupName, "The average number of occurrence of " + opName + " operation per second.", metricTags)));
    }

    @Test
    public void testMetrics() {
        StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
        Metrics metrics = new Metrics();
        MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
        ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
        node.init(context);
        String[] latencyOperations = new String[]{"process", "punctuate", "create", "destroy"};
        String throughputOperation = "forward";
        String groupName = "stream-processor-node-metrics";
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("processor-node-id", node.name());
        metricTags.put("task-id", context.taskId().toString());
        for (String operation : latencyOperations) {
            Assert.assertNotNull((Object)metrics.getSensor(operation));
        }
        Assert.assertNotNull((Object)metrics.getSensor(throughputOperation));
        for (String opName : latencyOperations) {
            this.testSpecificMetrics(metrics, groupName, opName, metricTags);
        }
        Assert.assertNotNull((Object)metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName, "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
        metricTags.put("processor-node-id", "all");
        for (String opName : latencyOperations) {
            this.testSpecificMetrics(metrics, groupName, opName, metricTags);
        }
        Assert.assertNotNull((Object)metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName, "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
        context.close();
    }

    private static class NoOpProcessor
    implements Processor {
        private NoOpProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
        }

        public void process(Object key, Object value) {
        }

        @Override
        public void punctuate(long timestamp) {
        }

        @Override
        public void close() {
        }
    }

    private static class ExceptionalProcessor
    implements Processor {
        private ExceptionalProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
            throw new RuntimeException();
        }

        public void process(Object key, Object value) {
            throw new RuntimeException();
        }

        @Override
        public void punctuate(long timestamp) {
            throw new RuntimeException();
        }

        @Override
        public void close() {
            throw new RuntimeException();
        }
    }
}

