/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.model.worker;

import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.cql.WorkerCQL;
import com.scylladb.cdc.model.FutureUtils;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.Task;
import com.scylladb.cdc.model.worker.TaskState;
import com.scylladb.cdc.model.worker.WorkerConfiguration;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import shaded.com.scylladb.cdc.driver3.common.base.Preconditions;

abstract class TaskAction {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    protected final WorkerConfiguration workerConfiguration;
    protected final Task task;

    protected TaskAction(WorkerConfiguration workerConfiguration, Task task) {
        this.workerConfiguration = Preconditions.checkNotNull(workerConfiguration);
        this.task = Preconditions.checkNotNull(task);
    }

    protected CompletableFuture<Void> delay(long millis) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.workerConfiguration.getExecutorService().schedule(() -> result.complete(null), millis, TimeUnit.MILLISECONDS);
        return result;
    }

    public abstract CompletableFuture<TaskAction> run();

    public static TaskAction createFirstAction(WorkerConfiguration workerConfiguration, Task task) {
        return new ReadNewWindowTaskAction(workerConfiguration, task, 0);
    }

    private static final class MoveToNextWindowTaskAction
    extends TaskAction {
        public MoveToNextWindowTaskAction(WorkerConfiguration workerConfiguration, Task task) {
            super(workerConfiguration, task);
        }

        @Override
        public CompletableFuture<TaskAction> run() {
            TaskState newState = this.task.state.moveToNextWindow(this.workerConfiguration.queryTimeWindowSizeMs);
            this.workerConfiguration.transport.moveStateToNextWindow(this.task.id, newState);
            Task newTask = this.task.updateState(newState);
            return CompletableFuture.completedFuture(new ReadNewWindowTaskAction(this.workerConfiguration, newTask, 0));
        }
    }

    private static final class ConsumeChangeTaskAction
    extends TaskAction {
        private final WorkerCQL.Reader reader;
        private final Optional<RawChange> change;
        private final int tryAttempt;

        public ConsumeChangeTaskAction(WorkerConfiguration workerConfiguration, Task task, WorkerCQL.Reader reader, Optional<RawChange> change, int tryAttempt) {
            super(workerConfiguration, task);
            this.reader = Preconditions.checkNotNull(reader);
            this.change = Preconditions.checkNotNull(change);
            Preconditions.checkArgument(tryAttempt >= 0);
            this.tryAttempt = tryAttempt;
        }

        private CompletableFuture<TaskAction> onException(Throwable ex) {
            long backoffTime = this.workerConfiguration.workerRetryBackoff.getRetryBackoffTimeMs(this.tryAttempt);
            ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause(ex)).log("Error while executing consume() method provided to the library. Task: %s. Task state: %s. Will retry after backoff (%d ms).", (Object)this.task.id, (Object)this.task.state, (Object)backoffTime);
            return this.delay(backoffTime).thenApply(t2 -> new ReadNewWindowTaskAction(this.workerConfiguration, this.task, this.tryAttempt + 1));
        }

        @Override
        public CompletableFuture<TaskAction> run() {
            if (this.change.isPresent()) {
                Task updatedTask = this.change.get().isEndOfBatch() ? this.task.updateState(this.change.get().getId()) : this.task;
                try {
                    CompletionStage taskActionFuture = this.workerConfiguration.consumer.getConsumerDispatch().consume(this.task, this.change.get(), updatedTask).thenApply(newState -> new ReadChangeTaskAction(this.workerConfiguration, updatedTask, this.reader, this.tryAttempt, (TaskState)newState));
                    return FutureUtils.thenComposeExceptionally(taskActionFuture, this::onException);
                }
                catch (Throwable ex) {
                    return this.onException(ex);
                }
            }
            if (this.tryAttempt > 0) {
                ((FluentLogger.Api)logger.atWarning()).log("Successfully finished reading a window after %d tries. Task: %s. Task state: %s.", (Object)this.tryAttempt, (Object)this.task.id, (Object)this.task.state);
            }
            return CompletableFuture.completedFuture(new MoveToNextWindowTaskAction(this.workerConfiguration, this.task));
        }
    }

    private static class ReadChangeTaskAction
    extends TaskAction {
        private final WorkerCQL.Reader reader;
        private final int tryAttempt;
        private final TaskState newState;

        public ReadChangeTaskAction(WorkerConfiguration workerConfiguration, Task task, WorkerCQL.Reader reader, int tryAttempt, TaskState newState) {
            super(workerConfiguration, task);
            this.reader = Preconditions.checkNotNull(reader);
            Preconditions.checkArgument(tryAttempt >= 0);
            this.tryAttempt = tryAttempt;
            this.newState = newState;
        }

        private CompletableFuture<TaskAction> onException(Throwable ex) {
            long backoffTime = this.workerConfiguration.workerRetryBackoff.getRetryBackoffTimeMs(this.tryAttempt);
            ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause(ex)).log("Error while reading a CDC change. Task: %s. Task state: %s. Will retry after backoff (%d ms).", (Object)this.task.id, (Object)this.task.state, (Object)backoffTime);
            return this.delay(backoffTime).thenApply(t2 -> new ReadNewWindowTaskAction(this.workerConfiguration, this.task, this.tryAttempt + 1));
        }

        @Override
        public CompletableFuture<TaskAction> run() {
            if (this.newState != null) {
                this.workerConfiguration.transport.setState(this.task.id, this.newState);
            }
            try {
                CompletionStage taskActionFuture = this.reader.nextChange().thenApply(change -> new ConsumeChangeTaskAction(this.workerConfiguration, this.task, this.reader, (Optional<RawChange>)change, this.tryAttempt));
                return FutureUtils.thenComposeExceptionally(taskActionFuture, this::onException);
            }
            catch (Throwable ex) {
                return this.onException(ex);
            }
        }
    }

    private static final class ReadNewWindowTaskAction
    extends TaskAction {
        private final int tryAttempt;

        protected ReadNewWindowTaskAction(WorkerConfiguration workerConfiguration, Task task, int tryAttempt) {
            super(workerConfiguration, task);
            Preconditions.checkArgument(tryAttempt >= 0);
            this.tryAttempt = tryAttempt;
        }

        private CompletableFuture<TaskAction> onException(Throwable ex) {
            long backoffTime = this.workerConfiguration.workerRetryBackoff.getRetryBackoffTimeMs(this.tryAttempt);
            ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause(ex)).log("Error while starting reading next window. Task: %s. Task state: %s. Will retry after backoff (%d ms).", (Object)this.task.id, (Object)this.task.state, (Object)backoffTime);
            return this.delay(backoffTime).thenApply(t2 -> new ReadNewWindowTaskAction(this.workerConfiguration, this.task, this.tryAttempt + 1));
        }

        @Override
        public CompletableFuture<TaskAction> run() {
            try {
                CompletableFuture<Void> waitFuture = this.waitForWindow();
                CompletionStage readerFuture = waitFuture.thenCompose(w -> this.workerConfiguration.cql.createReader(this.task));
                CompletionStage taskActionFuture = ((CompletableFuture)readerFuture).thenApply(reader -> new ReadChangeTaskAction(this.workerConfiguration, this.task, (WorkerCQL.Reader)reader, this.tryAttempt, this.task.state));
                return FutureUtils.thenComposeExceptionally(taskActionFuture, this::onException);
            }
            catch (Throwable ex) {
                return this.onException(ex);
            }
        }

        private CompletableFuture<Void> waitForWindow() {
            Date end = this.task.state.getWindowEndTimestamp().toDate();
            Date now = Date.from(this.workerConfiguration.getClock().instant());
            long toWait = end.getTime() - now.getTime() + this.workerConfiguration.confidenceWindowSizeMs;
            if ((toWait = Long.max(toWait, this.workerConfiguration.minimalWaitForWindowMs)) > 0L) {
                return this.delay(toWait);
            }
            return CompletableFuture.completedFuture(null);
        }
    }
}

