/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.common;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class BaseSourceTaskTest {
    private final MyBaseSourceTask baseSourceTask = new MyBaseSourceTask();

    @Before
    public void setup() {
        this.baseSourceTask.initialize((SourceTaskContext)Mockito.mock(SourceTaskContext.class));
    }

    @Test
    public void verifyTaskStartsAndStops() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.startCount.get());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator)Mockito.verify(this.baseSourceTask.coordinator)).stop();
    }

    @Test
    public void verifyStartAndStopWithoutPolling() {
        this.baseSourceTask.initialize((SourceTaskContext)Mockito.mock(SourceTaskContext.class));
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.startCount.get());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.stopCount.get());
    }

    @Test
    public void verifyTaskCanBeStartedAfterStopped() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        Assert.assertEquals((long)2L, (long)this.baseSourceTask.startCount.get());
        Assert.assertEquals((long)2L, (long)this.baseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator)Mockito.verify(this.baseSourceTask.coordinator, (VerificationMode)Mockito.times((int)2))).stop();
    }

    @Test
    public void verifyTaskRestartsSuccessfully() throws InterruptedException {
        MyBaseSourceTask baseSourceTask = new MyBaseSourceTask(){

            @Override
            protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration config) {
                ChangeEventSourceCoordinator<Partition, OffsetContext> result = super.start(config);
                if (this.startCount.get() < 4) {
                    throw new RetriableException("Retry " + this.startCount.get());
                }
                return result;
            }
        };
        baseSourceTask.initialize((SourceTaskContext)Mockito.mock(SourceTaskContext.class));
        Map<String, String> config = Map.of(CommonConnectorConfig.RETRIABLE_RESTART_WAIT.name(), "1");
        baseSourceTask.start(config);
        BaseSourceTaskTest.sleep(1L);
        Assert.assertEquals((Object)BaseSourceTask.State.RESTARTING, (Object)baseSourceTask.getTaskState());
        BaseSourceTaskTest.pollAndIgnoreRetryException(baseSourceTask);
        Assert.assertEquals((Object)BaseSourceTask.State.RESTARTING, (Object)baseSourceTask.getTaskState());
        BaseSourceTaskTest.sleep(1L);
        BaseSourceTaskTest.pollAndIgnoreRetryException(baseSourceTask);
        Assert.assertEquals((Object)BaseSourceTask.State.RESTARTING, (Object)baseSourceTask.getTaskState());
        BaseSourceTaskTest.sleep(1L);
        baseSourceTask.poll();
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)baseSourceTask.getTaskState());
        baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)baseSourceTask.getTaskState());
        Assert.assertEquals((long)4L, (long)baseSourceTask.startCount.get());
        Assert.assertEquals((long)3L, (long)baseSourceTask.stopCount.get());
        ((ChangeEventSourceCoordinator)Mockito.verify(baseSourceTask.coordinator, (VerificationMode)Mockito.times((int)1))).stop();
    }

    @Test
    public void verifyOutOfOrderPollDoesNotStartTask() throws InterruptedException {
        this.baseSourceTask.start(new HashMap());
        Assert.assertEquals((Object)BaseSourceTask.State.RUNNING, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.stop();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        this.baseSourceTask.poll();
        Assert.assertEquals((Object)BaseSourceTask.State.STOPPED, (Object)this.baseSourceTask.getTaskState());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.startCount.get());
        Assert.assertEquals((long)1L, (long)this.baseSourceTask.stopCount.get());
    }

    private static void pollAndIgnoreRetryException(BaseSourceTask<Partition, OffsetContext> baseSourceTask) throws InterruptedException {
        try {
            baseSourceTask.poll();
        }
        catch (RetriableException retriableException) {
            // empty catch block
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException();
        }
    }

    public static class MyBaseSourceTask
    extends BaseSourceTask<Partition, OffsetContext> {
        final List<SourceRecord> records = new ArrayList<SourceRecord>();
        final AtomicInteger startCount = new AtomicInteger();
        final AtomicInteger stopCount = new AtomicInteger();
        final ChangeEventSourceCoordinator<Partition, OffsetContext> coordinator = (ChangeEventSourceCoordinator)Mockito.mock(ChangeEventSourceCoordinator.class);

        protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration config) {
            this.startCount.incrementAndGet();
            return this.coordinator;
        }

        protected List<SourceRecord> doPoll() {
            return this.records;
        }

        protected void resetErrorHandlerRetriesIfNeeded() {
        }

        protected void doStop() {
            this.stopCount.incrementAndGet();
        }

        protected Iterable<Field> getAllConfigurationFields() {
            return List.of(Field.create((String)"f1"));
        }

        public String version() {
            return "1.0";
        }
    }
}

