/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.LoggingContext;
import java.io.IOException;
import java.time.Duration;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

public class ErrorHandlerTest {
    @Test
    public void noError() throws Exception {
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        this.poll(queue);
    }

    @Test
    public void nonRetriableByDefault() throws Exception {
        Configuration config = Configuration.empty();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IllegalArgumentException error = new IllegalArgumentException("This is my error");
        this.initErrorHandler(config, queue, error);
        this.pollAndAssertNonRetriable(queue);
    }

    @Test
    public void isRetriable() throws Exception {
        Configuration config = Configuration.create().build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IOException error = new IOException("This is my error");
        this.initErrorHandler(config, queue, error);
        this.pollAndAssertRetriable(queue);
    }

    @Test
    public void isRetryingWithMaxTimes() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.MAX_RETRIES_ON_ERROR, 2)).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IOException error = new IOException("This is my error");
        ErrorHandler errorHandler = this.initErrorHandler(config, queue, error);
        this.pollAndAssertRetriable(queue);
        errorHandler = this.replaceErrorHandler(config, queue, new Exception(error), errorHandler);
        this.pollAndAssertRetriable(queue);
        this.replaceErrorHandler(config, queue, error, errorHandler);
        this.pollAndAssertNonRetriable(queue);
    }

    @Test
    public void isNotRetryingWithMaxRetries() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.MAX_RETRIES_ON_ERROR, 2)).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IOException error = new IOException("This is my error");
        ErrorHandler errorHandler = this.initErrorHandler(config, queue, error);
        this.pollAndAssertRetriable(queue);
        Exception fatalError = new Exception("This is fatal");
        this.replaceErrorHandler(config, queue, fatalError, errorHandler);
        this.pollAndAssertNonRetriable(queue);
    }

    @Test
    public void customRetriableMatch() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        this.initErrorHandler(config, queue, error);
        this.pollAndAssertRetriable(queue);
    }

    @Test
    public void customRetriableNoMatch() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*not my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        this.initErrorHandler(config, queue, error);
        this.pollAndAssertNonRetriable(queue);
    }

    @Test
    public void customRetriableMatchNested() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        this.initErrorHandler(config, queue, new Exception("Main", error));
        this.pollAndAssertRetriable(queue);
    }

    private void pollAndAssertRetriable(ChangeEventQueue<DataChangeEvent> queue) throws Exception {
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((Throwable)e).isInstanceOf(RetriableException.class);
        }
    }

    private void pollAndAssertNonRetriable(ChangeEventQueue<DataChangeEvent> queue) throws Exception {
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((Throwable)e).isNotInstanceOf(RetriableException.class);
        }
    }

    private void poll(ChangeEventQueue<DataChangeEvent> queue) throws InterruptedException {
        for (int i = 0; i < 10; ++i) {
            queue.poll();
            Thread.sleep(100L);
        }
    }

    private ErrorHandler initErrorHandler(Configuration config, ChangeEventQueue<DataChangeEvent> queue, Throwable producerThrowable) {
        return this.replaceErrorHandler(config, queue, producerThrowable, null);
    }

    private ErrorHandler replaceErrorHandler(Configuration config, ChangeEventQueue<DataChangeEvent> queue, Throwable producerThrowable, ErrorHandler replacedErrorHandler) {
        ErrorHandler errorHandler = new ErrorHandler(SourceConnector.class, (CommonConnectorConfig)new TestConnectorConfig(config), queue, replacedErrorHandler);
        if (producerThrowable != null) {
            errorHandler.setProducerThrowable(producerThrowable);
        }
        return errorHandler;
    }

    private ChangeEventQueue<DataChangeEvent> queue() {
        ChangeEventQueue queue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(1L)).maxBatchSize(1000).maxQueueSize(1000).loggingContextSupplier(() -> LoggingContext.forConnector((String)"test", (String)"test", (String)"test")).build();
        return queue;
    }

    private static final class TestConnectorConfig
    extends CommonConnectorConfig {
        protected TestConnectorConfig(Configuration config) {
            super(config, 0);
        }

        public String getContextName() {
            return "test";
        }

        public String getConnectorName() {
            return "test";
        }

        protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
            return null;
        }
    }
}

