/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.raptor.legacy.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import io.trino.plugin.raptor.legacy.backup.BackupStore;
import io.trino.plugin.raptor.legacy.metadata.ShardCleanerConfig;
import io.trino.plugin.raptor.legacy.metadata.ShardDao;
import io.trino.plugin.raptor.legacy.metadata.ShardMetadata;
import io.trino.plugin.raptor.legacy.storage.StorageService;
import io.trino.plugin.raptor.legacy.util.DaoSupplier;
import io.trino.spi.NodeManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.File;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class ShardCleaner {
    private static final Logger log = Logger.get(ShardCleaner.class);
    private final ShardDao dao;
    private final String currentNode;
    private final boolean coordinator;
    private final Ticker ticker;
    private final StorageService storageService;
    private final Optional<BackupStore> backupStore;
    private final Duration maxTransactionAge;
    private final Duration transactionCleanerInterval;
    private final Duration localCleanerInterval;
    private final Duration localCleanTime;
    private final Duration backupCleanerInterval;
    private final Duration backupCleanTime;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService backupExecutor;
    private final Duration maxCompletedTransactionAge;
    private final AtomicBoolean started = new AtomicBoolean();
    private final CounterStat transactionJobErrors = new CounterStat();
    private final CounterStat backupJobErrors = new CounterStat();
    private final CounterStat localJobErrors = new CounterStat();
    private final CounterStat backupShardsQueued = new CounterStat();
    private final CounterStat backupShardsCleaned = new CounterStat();
    private final CounterStat localShardsCleaned = new CounterStat();
    @GuardedBy(value="this")
    private final Map<UUID, Long> shardsToClean = new HashMap<UUID, Long>();

    @Inject
    public ShardCleaner(DaoSupplier<ShardDao> shardDaoSupplier, Ticker ticker, NodeManager nodeManager, StorageService storageService, Optional<BackupStore> backupStore, ShardCleanerConfig config) {
        this(shardDaoSupplier, nodeManager.getCurrentNode().getNodeIdentifier(), nodeManager.getCurrentNode().isCoordinator(), ticker, storageService, backupStore, config.getMaxTransactionAge(), config.getTransactionCleanerInterval(), config.getLocalCleanerInterval(), config.getLocalCleanTime(), config.getBackupCleanerInterval(), config.getBackupCleanTime(), config.getBackupDeletionThreads(), config.getMaxCompletedTransactionAge());
    }

    public ShardCleaner(DaoSupplier<ShardDao> shardDaoSupplier, String currentNode, boolean coordinator, Ticker ticker, StorageService storageService, Optional<BackupStore> backupStore, Duration maxTransactionAge, Duration transactionCleanerInterval, Duration localCleanerInterval, Duration localCleanTime, Duration backupCleanerInterval, Duration backupCleanTime, int backupDeletionThreads, Duration maxCompletedTransactionAge) {
        this.dao = shardDaoSupplier.onDemand();
        this.currentNode = Objects.requireNonNull(currentNode, "currentNode is null");
        this.coordinator = coordinator;
        this.ticker = Objects.requireNonNull(ticker, "ticker is null");
        this.storageService = Objects.requireNonNull(storageService, "storageService is null");
        this.backupStore = Objects.requireNonNull(backupStore, "backupStore is null");
        this.maxTransactionAge = Objects.requireNonNull(maxTransactionAge, "maxTransactionAge");
        this.transactionCleanerInterval = Objects.requireNonNull(transactionCleanerInterval, "transactionCleanerInterval is null");
        this.localCleanerInterval = Objects.requireNonNull(localCleanerInterval, "localCleanerInterval is null");
        this.localCleanTime = Objects.requireNonNull(localCleanTime, "localCleanTime is null");
        this.backupCleanerInterval = Objects.requireNonNull(backupCleanerInterval, "backupCleanerInterval is null");
        this.backupCleanTime = Objects.requireNonNull(backupCleanTime, "backupCleanTime is null");
        this.scheduler = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"shard-cleaner-%s"));
        this.backupExecutor = Executors.newFixedThreadPool(backupDeletionThreads, Threads.daemonThreadsNamed((String)"shard-cleaner-backup-%s"));
        this.maxCompletedTransactionAge = Objects.requireNonNull(maxCompletedTransactionAge, "maxCompletedTransactionAge is null");
    }

    @PostConstruct
    public void start() {
        if (!this.started.getAndSet(true)) {
            this.startJobs();
        }
    }

    @PreDestroy
    public void shutdown() {
        this.scheduler.shutdownNow();
        this.backupExecutor.shutdownNow();
    }

    @Managed
    @Nested
    public CounterStat getTransactionJobErrors() {
        return this.transactionJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getBackupJobErrors() {
        return this.backupJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getLocalJobErrors() {
        return this.localJobErrors;
    }

    @Managed
    @Nested
    public CounterStat getBackupShardsQueued() {
        return this.backupShardsQueued;
    }

    @Managed
    @Nested
    public CounterStat getBackupShardsCleaned() {
        return this.backupShardsCleaned;
    }

    @Managed
    @Nested
    public CounterStat getLocalShardsCleaned() {
        return this.localShardsCleaned;
    }

    private void startJobs() {
        if (this.coordinator) {
            this.startTransactionCleanup();
            if (this.backupStore.isPresent()) {
                this.scheduleBackupCleanup();
            }
        }
        if (this.backupStore.isPresent()) {
            this.scheduleLocalCleanup();
        }
    }

    private void startTransactionCleanup() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.abortOldTransactions();
                this.deleteOldCompletedTransactions();
                this.deleteOldShards();
            }
            catch (Throwable t) {
                log.error(t, "Error cleaning transactions");
                this.transactionJobErrors.update(1L);
            }
        }, 0L, this.transactionCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void scheduleBackupCleanup() {
        this.scheduler.scheduleWithFixedDelay(this::runBackupCleanup, 0L, this.backupCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void scheduleLocalCleanup() {
        Set<UUID> local = this.getLocalShards();
        this.scheduler.submit(() -> {
            this.waitJitterTime();
            this.runLocalCleanupImmediately(local);
        });
        this.scheduler.scheduleWithFixedDelay(() -> {
            this.waitJitterTime();
            this.runLocalCleanup();
        }, 0L, this.localCleanerInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private synchronized void runBackupCleanup() {
        try {
            this.cleanBackupShards();
        }
        catch (Throwable t) {
            log.error(t, "Error cleaning backup shards");
            this.backupJobErrors.update(1L);
        }
    }

    private void waitJitterTime() {
        try {
            long interval = this.localCleanerInterval.roundTo(TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextLong(1L, interval));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void runLocalCleanupImmediately(Set<UUID> local) {
        try {
            this.cleanLocalShardsImmediately(local);
        }
        catch (Throwable t) {
            log.error(t, "Error cleaning local shards");
            this.localJobErrors.update(1L);
        }
    }

    private synchronized void runLocalCleanup() {
        try {
            this.cleanLocalShards();
        }
        catch (Throwable t) {
            log.error(t, "Error cleaning local shards");
            this.localJobErrors.update(1L);
        }
    }

    @Managed
    public void startBackupCleanup() {
        this.scheduler.submit(this::runBackupCleanup);
    }

    @Managed
    public void startLocalCleanup() {
        this.scheduler.submit(this::runLocalCleanup);
    }

    @Managed
    public void startLocalCleanupImmediately() {
        this.scheduler.submit(() -> this.runLocalCleanupImmediately(this.getLocalShards()));
    }

    @VisibleForTesting
    void abortOldTransactions() {
        this.dao.abortOldTransactions(ShardCleaner.maxTimestamp(this.maxTransactionAge));
    }

    @VisibleForTesting
    void deleteOldShards() {
        List<UUID> shards;
        while (!Thread.currentThread().isInterrupted() && !(shards = this.dao.getOldCreatedShardsBatch()).isEmpty()) {
            this.dao.insertDeletedShards(shards);
            this.dao.deleteCreatedShards(shards);
            this.backupShardsQueued.update((long)shards.size());
        }
    }

    @VisibleForTesting
    void deleteOldCompletedTransactions() {
        int deleted;
        while (!Thread.currentThread().isInterrupted() && (deleted = this.dao.deleteOldCompletedTransactions(ShardCleaner.maxTimestamp(this.maxCompletedTransactionAge))) >= 10000) {
        }
    }

    @VisibleForTesting
    synchronized Set<UUID> getLocalShards() {
        return this.storageService.getStorageShards();
    }

    @VisibleForTesting
    synchronized void cleanLocalShardsImmediately(Set<UUID> local) {
        Set assigned = this.dao.getNodeShards(this.currentNode, null).stream().map(ShardMetadata::getShardUuid).collect(Collectors.toSet());
        Sets.SetView deletions = Sets.difference(local, assigned);
        for (UUID uuid : deletions) {
            ShardCleaner.deleteFile(this.storageService.getStorageFile(uuid));
        }
        this.localShardsCleaned.update((long)deletions.size());
        log.info("Cleaned %s local shards immediately", new Object[]{deletions.size()});
    }

    @VisibleForTesting
    synchronized void cleanLocalShards() {
        Set<UUID> local = this.getLocalShards();
        Set assigned = this.dao.getNodeShards(this.currentNode, null).stream().map(ShardMetadata::getShardUuid).collect(Collectors.toSet());
        for (UUID uuid : assigned) {
            this.shardsToClean.remove(uuid);
        }
        for (UUID uuid : local) {
            if (assigned.contains(uuid)) continue;
            this.shardsToClean.putIfAbsent(uuid, this.ticker.read());
        }
        long threshold = this.ticker.read() - this.localCleanTime.roundTo(TimeUnit.NANOSECONDS);
        Set deletions = this.shardsToClean.entrySet().stream().filter(entry -> (Long)entry.getValue() < threshold).map(Map.Entry::getKey).collect(Collectors.toSet());
        if (deletions.isEmpty()) {
            return;
        }
        for (UUID uuid : deletions) {
            ShardCleaner.deleteFile(this.storageService.getStorageFile(uuid));
            this.shardsToClean.remove(uuid);
        }
        this.localShardsCleaned.update((long)deletions.size());
        log.info("Cleaned %s local shards", new Object[]{deletions.size()});
    }

    @VisibleForTesting
    void cleanBackupShards() {
        Set processing = Sets.newConcurrentHashSet();
        LinkedBlockingQueue completed = new LinkedBlockingQueue();
        boolean fill = true;
        while (!Thread.currentThread().isInterrupted()) {
            Object uuids = ImmutableSet.of();
            if (fill && processing.size() < 1000) {
                uuids = this.dao.getCleanableShardsBatch(ShardCleaner.maxTimestamp(this.backupCleanTime));
                fill = false;
            }
            if (uuids.isEmpty() && processing.isEmpty()) break;
            uuids = ImmutableSet.copyOf((Collection)Sets.difference((Set)uuids, (Set)processing));
            processing.addAll(uuids);
            Iterator iterator = uuids.iterator();
            while (iterator.hasNext()) {
                UUID uuid = (UUID)iterator.next();
                ((CompletableFuture)CompletableFuture.runAsync(() -> this.backupStore.get().deleteShard(uuid), this.backupExecutor).thenAccept(v -> completed.add(uuid))).whenComplete((v, e) -> {
                    if (e != null) {
                        log.error(e, "Error cleaning backup shard: %s", new Object[]{uuid});
                        this.backupJobErrors.update(1L);
                        processing.remove(uuid);
                    }
                });
            }
            int desired = Math.min(100, processing.size());
            Collection<UUID> done = ShardCleaner.drain(completed, desired, 100L, TimeUnit.MILLISECONDS);
            if (done.isEmpty()) continue;
            processing.removeAll(done);
            this.dao.deleteCleanedShards(done);
            this.backupShardsCleaned.update((long)done.size());
            fill = true;
        }
    }

    private static <T> Collection<T> drain(BlockingQueue<T> queue, int desired, long timeout, TimeUnit unit) {
        long start = System.nanoTime();
        ArrayList<T> result = new ArrayList<T>();
        while (true) {
            queue.drainTo(result);
            if (result.size() >= desired) {
                return result;
            }
            long elapsedNanos = System.nanoTime() - start;
            long remainingNanos = unit.toNanos(timeout) - elapsedNanos;
            if (remainingNanos <= 0L) {
                return result;
            }
            try {
                T value = queue.poll(remainingNanos, TimeUnit.NANOSECONDS);
                if (value == null) continue;
                result.add(value);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return result;
            }
        }
    }

    private static Timestamp maxTimestamp(Duration duration) {
        return new Timestamp(System.currentTimeMillis() - duration.toMillis());
    }

    private static void deleteFile(File file) {
        file.delete();
    }
}

