/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.model.worker;

import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.model.GenerationId;
import com.scylladb.cdc.model.StreamId;
import com.scylladb.cdc.model.TableName;
import com.scylladb.cdc.model.TaskId;
import com.scylladb.cdc.model.Timestamp;
import com.scylladb.cdc.model.worker.Task;
import com.scylladb.cdc.model.worker.TaskAction;
import com.scylladb.cdc.model.worker.TaskState;
import com.scylladb.cdc.model.worker.WorkerConfiguration;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import shaded.com.scylladb.cdc.driver3.common.base.Preconditions;

public final class Worker {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final WorkerConfiguration workerConfiguration;
    private volatile boolean shouldStop = false;

    public Worker(WorkerConfiguration workerConfiguration) {
        this.workerConfiguration = Preconditions.checkNotNull(workerConfiguration);
    }

    private static GenerationId getGenerationIdOfStreams(Map<TaskId, SortedSet<StreamId>> groupedStreams) {
        return groupedStreams.entrySet().stream().map(e -> ((TaskId)e.getKey()).getGenerationId()).findAny().get();
    }

    private static TaskState getInitialStateForStreams(Map<TaskId, SortedSet<StreamId>> groupedStreams, long windowSizeMs) {
        return TaskState.createInitialFor(Worker.getGenerationIdOfStreams(groupedStreams), windowSizeMs);
    }

    private Stream<Task> createTasksWithState(Map<TaskId, SortedSet<StreamId>> groupedStreams) throws ExecutionException, InterruptedException {
        Map<TaskId, TaskState> states = this.workerConfiguration.transport.getTaskStates(groupedStreams.keySet());
        TaskState initialState = Worker.getInitialStateForStreams(groupedStreams, this.workerConfiguration.queryTimeWindowSizeMs);
        Set tableNames = groupedStreams.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet());
        Date now = Date.from(this.workerConfiguration.getClock().instant());
        HashMap<TableName, Timestamp> minimumWindowStarts = new HashMap<TableName, Timestamp>();
        for (TableName tableName : tableNames) {
            Optional<Long> ttl = this.workerConfiguration.cql.fetchTableTTL(tableName).get();
            Date minimumWindowStart = new Date(0L);
            if (ttl.isPresent()) {
                minimumWindowStart = new Date(now.getTime() - 1000L * ttl.get());
            }
            minimumWindowStarts.put(tableName, new Timestamp(minimumWindowStart));
        }
        return groupedStreams.entrySet().stream().map(taskStreams -> {
            TaskId id = (TaskId)taskStreams.getKey();
            SortedSet streams = (SortedSet)taskStreams.getValue();
            TaskState state = states.getOrDefault(id, initialState);
            state = state.trimTaskState((Timestamp)minimumWindowStarts.get(id.getTable()), this.workerConfiguration.queryTimeWindowSizeMs);
            return new Task(id, streams, state);
        });
    }

    private Collection<TaskAction> queueFirstActionForEachTask(Map<TaskId, SortedSet<StreamId>> groupedStreams) throws ExecutionException, InterruptedException {
        return this.createTasksWithState(groupedStreams).map(task -> TaskAction.createFirstAction(this.workerConfiguration, task)).collect(Collectors.toSet());
    }

    private boolean shouldStop() {
        return this.shouldStop;
    }

    private ScheduledExecutorService getExecutorService() {
        return this.workerConfiguration.getExecutorService();
    }

    private Callable<Object> makeCallable(TaskAction a) {
        return () -> a.run().handle((na, ex) -> {
            if (ex != null) {
                ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause(ex)).log("Unhandled exception in Worker.");
            } else if (!this.shouldStop()) {
                this.getExecutorService().submit(this.makeCallable((TaskAction)na));
            }
            return null;
        });
    }

    private void performActionsUntilStopRequested(Collection<TaskAction> actions) {
        if (this.shouldStop()) {
            return;
        }
        ScheduledExecutorService executorService = this.getExecutorService();
        try {
            executorService.invokeAll(actions.stream().map(a -> this.makeCallable((TaskAction)a)).collect(Collectors.toSet()));
            do {
                executorService.awaitTermination(50L, TimeUnit.MILLISECONDS);
                if (!this.workerConfiguration.transport.shouldStop()) continue;
                this.stop();
            } while (!this.shouldStop() && !executorService.isTerminated());
        }
        catch (InterruptedException e) {
            ((FluentLogger.Api)logger.atWarning()).log("Worker interrupted");
        }
    }

    public void stop() {
        this.shouldStop = true;
        this.getExecutorService().shutdown();
    }

    public void run(Map<TaskId, SortedSet<StreamId>> groupedStreams) throws InterruptedException, ExecutionException {
        Preconditions.checkNotNull(groupedStreams);
        Preconditions.checkArgument(!groupedStreams.isEmpty(), "No tasks");
        Preconditions.checkArgument(groupedStreams.entrySet().stream().noneMatch(e -> ((SortedSet)e.getValue()).isEmpty()), "Task with no streams");
        Preconditions.checkArgument(groupedStreams.keySet().stream().map(TaskId::getGenerationId).distinct().count() == 1L, "Tasks from different generations");
        this.workerConfiguration.cql.prepare(groupedStreams.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet()));
        Collection<TaskAction> actions = this.queueFirstActionForEachTask(groupedStreams);
        this.performActionsUntilStopRequested(actions);
    }
}

