/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceConnector;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Test;

public class ErrorHandlerTest {
    @Test
    public void noError() throws Exception {
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        this.poll(queue);
    }

    @Test
    public void nonRetriableByDefault() throws Exception {
        Configuration config = Configuration.empty();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        ErrorHandler errorHandler = this.errorHandler(config, queue);
        IllegalArgumentException error = new IllegalArgumentException("This is my error");
        errorHandler.setProducerThrowable((Throwable)error);
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((boolean)(e instanceof RetriableException)).isFalse();
        }
    }

    private void poll(ChangeEventQueue<DataChangeEvent> queue) throws InterruptedException {
        for (int i = 0; i < 10; ++i) {
            queue.poll();
            Thread.sleep(100L);
        }
    }

    @Test
    public void customRetriableMatch() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        ErrorHandler errorHandler = this.errorHandler(config, queue);
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        errorHandler.setProducerThrowable((Throwable)error);
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((boolean)(e instanceof RetriableException)).isTrue();
        }
    }

    @Test
    public void customRetriableNoMatch() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*not my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        ErrorHandler errorHandler = this.errorHandler(config, queue);
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        errorHandler.setProducerThrowable((Throwable)error);
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((boolean)(e instanceof RetriableException)).isFalse();
        }
    }

    @Test
    public void customRetriableMatchNested() throws Exception {
        Configuration config = ((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*")).build();
        ChangeEventQueue<DataChangeEvent> queue = this.queue();
        ErrorHandler errorHandler = this.errorHandler(config, queue);
        IllegalArgumentException error = new IllegalArgumentException("This is my error to retry");
        errorHandler.setProducerThrowable((Throwable)new Exception("Main", error));
        try {
            this.poll(queue);
            Assert.fail((String)"Exception must be thrown");
        }
        catch (ConnectException e) {
            Assertions.assertThat((boolean)(e instanceof RetriableException)).isTrue();
        }
    }

    private ErrorHandler errorHandler(Configuration config, ChangeEventQueue<DataChangeEvent> queue) {
        ErrorHandler errorHandler = new ErrorHandler(SourceConnector.class, (CommonConnectorConfig)new TestConnectorConfig(config), queue);
        return errorHandler;
    }

    private ChangeEventQueue<DataChangeEvent> queue() {
        ChangeEventQueue queue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(1L)).maxBatchSize(1000).maxQueueSize(1000).loggingContextSupplier(() -> LoggingContext.forConnector((String)"test", (String)"test", (String)"test")).build();
        return queue;
    }

    private static final class TestConnectorConfig
    extends CommonConnectorConfig {
        protected TestConnectorConfig(Configuration config) {
            super(config, 0);
        }

        public String getContextName() {
            return "test";
        }

        public String getConnectorName() {
            return "test";
        }

        protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
            return null;
        }
    }
}

