/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class ProcessingExceptionHandlerIntegrationTest {
    private final String threadId = Thread.currentThread().getName();
    private static final Instant TIMESTAMP = Instant.now();

    @Test
    public void shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() {
        List<KeyValue> events = Arrays.asList(new KeyValue((Object)"ID123-1", (Object)"ID123-A1"), new KeyValue((Object)"ID123-2-ERR", (Object)"ID123-A2"), new KeyValue((Object)"ID123-3", (Object)"ID123-A3"), new KeyValue((Object)"ID123-4", (Object)"ID123-A4"));
        List<KeyValueTimestamp<String, String>> expectedProcessedRecords = Collections.singletonList(new KeyValueTimestamp<String, String>("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", FailProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            StreamsException exception = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO));
            Assertions.assertTrue((boolean)exception.getMessage().contains("Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, partition=0, offset=1"));
            Assertions.assertEquals((int)1, (int)processor.theCapturedProcessor().processed().size());
            Assertions.assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
            MetricName dropTotal = this.droppedRecordsTotalMetric();
            MetricName dropRate = this.droppedRecordsRateMetric();
            Assertions.assertEquals((Object)0.0, (Object)((Metric)driver.metrics().get(dropTotal)).metricValue());
            Assertions.assertEquals((Object)0.0, (Object)((Metric)driver.metrics().get(dropRate)).metricValue());
        }
    }

    @Test
    public void shouldFailWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsFail() {
        List<KeyValue> events = Arrays.asList(new KeyValue((Object)"ID123-1", (Object)"ID123-A1"), new KeyValue((Object)"ID123-1", (Object)"ID123-A2"), new KeyValue((Object)"ID123-1", (Object)"ID123-A3"), new KeyValue((Object)"ID123-1", (Object)"ID123-A4"));
        List<KeyValueTimestamp> expectedProcessedRecords = Arrays.asList(new KeyValueTimestamp<String, String>("ID123-1", "1", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-1", "2", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().count().toStream().mapValues(value -> value.toString()).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", LogAndFailProcessingExceptionHandler.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            StreamsException exception = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO));
            Assertions.assertTrue((boolean)exception.getMessage().contains("Failed to flush cache of store KSTREAM-AGGREGATE-STATE-STORE-0000000001"));
            Assertions.assertEquals((int)expectedProcessedRecords.size(), (int)processor.theCapturedProcessor().processed().size());
            Assertions.assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
            MetricName dropTotal = this.droppedRecordsTotalMetric();
            MetricName dropRate = this.droppedRecordsRateMetric();
            Assertions.assertEquals((Object)0.0, (Object)((Metric)driver.metrics().get(dropTotal)).metricValue());
            Assertions.assertEquals((Object)0.0, (Object)((Metric)driver.metrics().get(dropRate)).metricValue());
        }
    }

    @Test
    public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() {
        List<KeyValue> events = Arrays.asList(new KeyValue((Object)"ID123-1", (Object)"ID123-A1"), new KeyValue((Object)"ID123-2-ERR", (Object)"ID123-A2"), new KeyValue((Object)"ID123-3", (Object)"ID123-A3"), new KeyValue((Object)"ID123-4", (Object)"ID123-A4"), new KeyValue((Object)"ID123-5-ERR", (Object)"ID123-A5"), new KeyValue((Object)"ID123-6", (Object)"ID123-A6"));
        List<KeyValueTimestamp> expectedProcessedRecords = Arrays.asList(new KeyValueTimestamp<String, String>("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-3", "ID123-A3", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-4", "ID123-A4", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-6", "ID123-A6", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
            Assertions.assertEquals((int)expectedProcessedRecords.size(), (int)processor.theCapturedProcessor().processed().size());
            Assertions.assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
            MetricName dropTotal = this.droppedRecordsTotalMetric();
            MetricName dropRate = this.droppedRecordsRateMetric();
            Assertions.assertEquals((Object)2.0, (Object)((Metric)driver.metrics().get(dropTotal)).metricValue());
            Assertions.assertTrue(((Double)((Metric)driver.metrics().get(dropRate)).metricValue() > 0.0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void shouldContinueWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsContinue() {
        List<KeyValue> events = Arrays.asList(new KeyValue((Object)"ID123-1", (Object)"ID123-A1"), new KeyValue((Object)"ID123-1", (Object)"ID123-A2"), new KeyValue((Object)"ID123-1", (Object)"ID123-A3"), new KeyValue((Object)"ID123-1", (Object)"ID123-A4"));
        List<KeyValueTimestamp> expectedProcessedRecords = Arrays.asList(new KeyValueTimestamp<String, String>("ID123-1", "1", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-1", "2", TIMESTAMP.toEpochMilli()), new KeyValueTimestamp<String, String>("ID123-1", "4", TIMESTAMP.toEpochMilli()));
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey().count().toStream().mapValues(value -> value.toString()).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", LogAndContinueProcessingExceptionHandler.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
            Assertions.assertEquals((int)expectedProcessedRecords.size(), (int)processor.theCapturedProcessor().processed().size());
            Assertions.assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
            MetricName dropTotal = this.droppedRecordsTotalMetric();
            MetricName dropRate = this.droppedRecordsRateMetric();
            Assertions.assertEquals((Object)1.0, (Object)((Metric)driver.metrics().get(dropTotal)).metricValue());
            Assertions.assertTrue(((Double)((Metric)driver.metrics().get(dropRate)).metricValue() > 0.0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() {
        KeyValue event = new KeyValue((Object)"ID123-1", (Object)"ID123-A1");
        KeyValue eventError = new KeyValue((Object)"ID123-2-ERR", (Object)"ID123-A2");
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        AtomicBoolean isExecuted = new AtomicBoolean(false);
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).map((k, v) -> {
            isExecuted.set(true);
            return KeyValue.pair((Object)k, (Object)v);
        }).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", FailProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            isExecuted.set(false);
            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
            Assertions.assertTrue((boolean)isExecuted.get());
            isExecuted.set(false);
            StreamsException e = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, TIMESTAMP));
            Assertions.assertTrue((boolean)e.getMessage().contains("Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, partition=0, offset=1"));
            Assertions.assertFalse((boolean)isExecuted.get());
        }
    }

    @Test
    public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler() {
        KeyValue event = new KeyValue((Object)"ID123-1", (Object)"ID123-A1");
        KeyValue eventFalse = new KeyValue((Object)"ID123-2-ERR", (Object)"ID123-A2");
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        AtomicBoolean isExecuted = new AtomicBoolean(false);
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).map((k, v) -> {
            isExecuted.set(true);
            return KeyValue.pair((Object)k, (Object)v);
        }).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            isExecuted.set(false);
            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
            Assertions.assertTrue((boolean)isExecuted.get());
            isExecuted.set(false);
            inputTopic.pipeInput(eventFalse.key, eventFalse.value, TIMESTAMP);
            Assertions.assertFalse((boolean)isExecuted.get());
        }
    }

    @Test
    public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
        KeyValue event = new KeyValue((Object)"ID123-1", (Object)"ID123-A1");
        KeyValue eventError = new KeyValue((Object)"ID123-ERR-NULL", (Object)"ID123-A2");
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        AtomicBoolean isExecuted = new AtomicBoolean(false);
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).map((k, v) -> {
            isExecuted.set(true);
            return KeyValue.pair((Object)k, (Object)v);
        }).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            isExecuted.set(false);
            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
            Assertions.assertTrue((boolean)isExecuted.get());
            isExecuted.set(false);
            StreamsException e = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
            Assertions.assertEquals((Object)"Fatal user code error in processing error callback", (Object)e.getMessage());
            Assertions.assertInstanceOf(NullPointerException.class, (Object)e.getCause());
            Assertions.assertEquals((Object)"Invalid ProductionExceptionHandler response.", (Object)e.getCause().getMessage());
            Assertions.assertFalse((boolean)isExecuted.get());
        }
    }

    @Test
    public void shouldStopProcessingWhenFatalUserExceptionProcessingExceptionHandler() {
        KeyValue event = new KeyValue((Object)"ID123-1", (Object)"ID123-A1");
        KeyValue eventError = new KeyValue((Object)"ID123-ERR-FATAL", (Object)"ID123-A2");
        MockProcessorSupplier processor = new MockProcessorSupplier();
        StreamsBuilder builder = new StreamsBuilder();
        AtomicBoolean isExecuted = new AtomicBoolean(false);
        builder.stream("TOPIC_NAME", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).map(KeyValue::new).mapValues(value -> value).process(this.runtimeErrorProcessorSupplierMock(), new String[0]).map((k, v) -> {
            isExecuted.set(true);
            return KeyValue.pair((Object)k, (Object)v);
        }).process(processor, new String[0]);
        Properties properties = new Properties();
        properties.put("processing.exception.handler", ContinueProcessingExceptionHandlerMockTest.class);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            isExecuted.set(false);
            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
            Assertions.assertTrue((boolean)isExecuted.get());
            isExecuted.set(false);
            StreamsException e = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
            Assertions.assertEquals((Object)"Fatal user code error in processing error callback", (Object)e.getMessage());
            Assertions.assertEquals((Object)"KABOOM!", (Object)e.getCause().getMessage());
            Assertions.assertFalse((boolean)isExecuted.get());
        }
    }

    private static void assertProcessingExceptionHandlerInputs(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
        Assertions.assertTrue((boolean)Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String)record.key()));
        Assertions.assertTrue((boolean)Arrays.asList("ID123-A2", "ID123-A5").contains((String)record.value()));
        Assertions.assertEquals((Object)"TOPIC_NAME", (Object)context.topic());
        Assertions.assertEquals((Object)"KSTREAM-PROCESSOR-0000000003", (Object)context.processorNodeId());
        Assertions.assertEquals((long)TIMESTAMP.toEpochMilli(), (long)context.timestamp());
        Assertions.assertTrue((boolean)exception.getMessage().contains("Exception should be handled by processing exception handler"));
    }

    private MetricName droppedRecordsTotalMetric() {
        return new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
    }

    private MetricName droppedRecordsRateMetric() {
        return new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
    }

    private ProcessorSupplier<String, String, String, String> runtimeErrorProcessorSupplierMock() {
        return () -> new ContextualProcessor<String, String, String, String>(){

            public void process(Record<String, String> record) {
                if (((String)record.key()).contains("ERR") || ((String)record.value()).equals("3")) {
                    throw new RuntimeException("Exception should be handled by processing exception handler");
                }
                this.context().forward(new Record(record.key(), record.value(), record.timestamp()));
            }
        };
    }

    public static class FailProcessingExceptionHandlerMockTest
    implements ProcessingExceptionHandler {
        public ProcessingExceptionHandler.ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
            ProcessingExceptionHandlerIntegrationTest.assertProcessingExceptionHandlerInputs(context, record, exception);
            return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    public static class ContinueProcessingExceptionHandlerMockTest
    implements ProcessingExceptionHandler {
        public ProcessingExceptionHandler.ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
            if (((String)record.key()).contains("FATAL")) {
                throw new RuntimeException("KABOOM!");
            }
            if (((String)record.key()).contains("NULL")) {
                return null;
            }
            ProcessingExceptionHandlerIntegrationTest.assertProcessingExceptionHandlerInputs(context, record, exception);
            return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
        }

        public void configure(Map<String, ?> configs) {
        }
    }
}

