/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.model.master;

import com.google.common.base.Preconditions;
import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.model.GenerationId;
import com.scylladb.cdc.model.StreamId;
import com.scylladb.cdc.model.TableName;
import com.scylladb.cdc.model.TaskId;
import com.scylladb.cdc.model.Timestamp;
import com.scylladb.cdc.model.master.GenerationMetadata;
import com.scylladb.cdc.model.master.MasterConfiguration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public final class Master {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final MasterConfiguration masterConfiguration;

    public Master(MasterConfiguration masterConfiguration) {
        this.masterConfiguration = (MasterConfiguration)Preconditions.checkNotNull((Object)masterConfiguration);
    }

    private GenerationId getGenerationId() throws InterruptedException, ExecutionException {
        Optional<GenerationId> generationId = this.masterConfiguration.transport.getCurrentGenerationId();
        if (generationId.isPresent()) {
            return generationId.get();
        }
        while (!(generationId = this.masterConfiguration.cql.fetchFirstGenerationId().get()).isPresent()) {
            Thread.sleep(this.masterConfiguration.sleepBeforeFirstGenerationMs);
        }
        return generationId.get();
    }

    private boolean generationDone(GenerationMetadata generation, Set<TaskId> tasks) throws ExecutionException, InterruptedException {
        if (!generation.isClosed()) {
            return false;
        }
        if (this.generationTTLExpired(generation)) {
            return true;
        }
        return this.masterConfiguration.transport.areTasksFullyConsumedUntil(tasks, generation.getEnd().get());
    }

    private boolean generationTTLExpired(GenerationMetadata generation) throws ExecutionException, InterruptedException {
        Date now = Date.from(this.masterConfiguration.clock.instant());
        ArrayList<Optional> tablesTTL = new ArrayList<Optional>();
        for (TableName table : this.masterConfiguration.tables) {
            Optional ttl = (Optional)((CompletableFuture)this.masterConfiguration.cql.fetchTableTTL(table).exceptionally(ex -> {
                ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause(ex)).log("Error while fetching TTL value for table %s.%s", (Object)table.keyspace, (Object)table.name);
                return Optional.empty();
            })).get();
            tablesTTL.add(ttl);
        }
        Date lastVisibleChanges = tablesTTL.stream().map(t -> t.map(ttl -> new Date(now.getTime() - 1000L * ttl)).orElse(new Date(0L))).min(Comparator.naturalOrder()).orElse(new Date(0L));
        return lastVisibleChanges.after(generation.getEnd().get().toDate());
    }

    private GenerationMetadata getNextGeneration(GenerationMetadata generation) throws InterruptedException, ExecutionException {
        return this.masterConfiguration.cql.fetchGenerationMetadata(generation.getNextGenerationId().get()).get();
    }

    private Map<TaskId, SortedSet<StreamId>> createTasks(GenerationMetadata generation) {
        SortedSet<StreamId> streams = generation.getStreams();
        HashMap<TaskId, SortedSet<StreamId>> tasks = new HashMap<TaskId, SortedSet<StreamId>>();
        for (StreamId s : streams) {
            for (TableName t : this.masterConfiguration.tables) {
                TaskId taskId = new TaskId(generation.getId(), s.getVNodeId(), t);
                tasks.computeIfAbsent(taskId, id -> new TreeSet()).add(s);
            }
        }
        return tasks;
    }

    private GenerationMetadata refreshEnd(GenerationMetadata generation) throws InterruptedException, ExecutionException {
        Optional<Timestamp> end = this.masterConfiguration.cql.fetchGenerationEnd(generation.getId()).get();
        return end.isPresent() ? generation.withEnd(end.get()) : generation;
    }

    public void run() {
        while (!Thread.interrupted()) {
            try {
                this.runUntilException();
            }
            catch (Exception ex) {
                ((FluentLogger.Api)((FluentLogger.Api)logger.atSevere()).withCause((Throwable)ex)).log("Got an Exception inside Master. Will attempt to retry after a back-off time.");
            }
            try {
                Thread.sleep(this.masterConfiguration.sleepAfterExceptionMs);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public Optional<Throwable> validate() {
        try {
            for (TableName table : this.masterConfiguration.tables) {
                Optional<Throwable> tableValidation = this.masterConfiguration.cql.validateTable(table).get();
                if (!tableValidation.isPresent()) continue;
                return tableValidation;
            }
        }
        catch (InterruptedException | ExecutionException ex) {
            return Optional.of(ex);
        }
        return Optional.empty();
    }

    private void runUntilException() throws ExecutionException {
        try {
            GenerationId generationId = this.getGenerationId();
            GenerationMetadata generation = this.masterConfiguration.cql.fetchGenerationMetadata(generationId).get();
            Map<TaskId, SortedSet<StreamId>> tasks = this.createTasks(generation);
            while (!Thread.interrupted()) {
                while (this.generationDone(generation, tasks.keySet())) {
                    generation = this.getNextGeneration(generation);
                    tasks = this.createTasks(generation);
                }
                ((FluentLogger.Api)logger.atInfo()).log("Master found a new generation: %s. Will call transport.configureWorkers().", (Object)generation.getId());
                tasks.forEach((task, streams) -> ((FluentLogger.Api)logger.atFine()).log("Created Task: %s with streams: %s", task, streams));
                this.masterConfiguration.transport.configureWorkers(tasks);
                while (!this.generationDone(generation, tasks.keySet())) {
                    Thread.sleep(this.masterConfiguration.sleepBeforeGenerationDoneMs);
                    if (generation.isClosed()) continue;
                    generation = this.refreshEnd(generation);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

