/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.metastore;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.metastore.Table;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.MaxTableParameterLength;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableOperationsProvider;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import org.weakref.jmx.Managed;

public class DeltaLakeTableMetadataScheduler {
    private static final Logger log = Logger.get(DeltaLakeTableMetadataScheduler.class);
    private static final String TRINO_LAST_TRANSACTION_VERSION = "trino_last_transaction_version";
    private static final String TRINO_METADATA_SCHEMA_STRING = "trino_metadata_schema_string";
    private static final int MAX_FAILED_COUNTS = 10;
    private final DeltaLakeTableOperationsProvider tableOperationsProvider;
    private final TypeManager typeManager;
    private final int tableParameterLengthLimit;
    private final int storeTableMetadataThreads;
    private final Map<SchemaTableName, TableUpdateInfo> updateInfos = new ConcurrentHashMap<SchemaTableName, TableUpdateInfo>();
    private final boolean enabled;
    private final Duration scheduleInterval;
    private ExecutorService executor;
    private ScheduledExecutorService scheduler;
    private final AtomicInteger failedCounts = new AtomicInteger();

    @Inject
    public DeltaLakeTableMetadataScheduler(NodeManager nodeManager, TypeManager typeManager, DeltaLakeTableOperationsProvider tableOperationsProvider, @MaxTableParameterLength int tableParameterLengthLimit, DeltaLakeConfig config) {
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
        this.tableOperationsProvider = Objects.requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
        this.tableParameterLengthLimit = tableParameterLengthLimit;
        this.storeTableMetadataThreads = config.getStoreTableMetadataThreads();
        Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.enabled = config.isStoreTableMetadataEnabled() && nodeManager.getCurrentNode().isCoordinator();
        this.scheduleInterval = config.getStoreTableMetadataInterval();
    }

    @Managed
    public boolean isShutdown() {
        return this.scheduler.isShutdown();
    }

    public void putAll(Map<SchemaTableName, TableUpdateInfo> tableParameters) {
        if (!this.enabled || this.scheduler.isShutdown()) {
            log.debug("Scheduler is already shutdown, skipping the update: %s", new Object[]{tableParameters});
            return;
        }
        this.updateInfos.putAll(tableParameters);
    }

    @PostConstruct
    public void start() {
        if (this.enabled) {
            this.executor = this.storeTableMetadataThreads == 0 ? MoreExecutors.newDirectExecutorService() : Executors.newFixedThreadPool(this.storeTableMetadataThreads, Threads.threadsNamed((String)"store-table-metadata-%s"));
            this.scheduler = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed((String)"store-table-metadata"));
            this.scheduler.scheduleWithFixedDelay(() -> {
                try {
                    this.process();
                }
                catch (Throwable e) {
                    log.warn(e, "Error storing table metadata");
                }
                try {
                    this.checkFailedTasks();
                }
                catch (Throwable e) {
                    log.warn(e, "Error canceling metadata update tasks");
                }
            }, 200L, this.scheduleInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void process() {
        ArrayList<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
        DeltaLakeTableMetadataScheduler deltaLakeTableMetadataScheduler = this;
        synchronized (deltaLakeTableMetadataScheduler) {
            if (this.updateInfos.isEmpty()) {
                return;
            }
            Map updateTables = (Map)this.updateInfos.entrySet().stream().collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue, BinaryOperator.maxBy(Comparator.comparing(TableUpdateInfo::version))));
            log.debug("Processing %s table(s): %s", new Object[]{updateTables.size(), updateTables.keySet()});
            for (Map.Entry entry : updateTables.entrySet()) {
                tasks.add(() -> {
                    this.updateTable((SchemaTableName)entry.getKey(), (TableUpdateInfo)entry.getValue());
                    return null;
                });
            }
            this.updateInfos.clear();
        }
        try {
            this.executor.invokeAll(tasks).forEach(MoreFutures::getDone);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void updateTable(SchemaTableName schemaTableName, TableUpdateInfo info) {
        log.debug("Updating table: '%s'", new Object[]{schemaTableName});
        try {
            this.tableOperationsProvider.createTableOperations(info.session).commitToExistingTable(schemaTableName, info.version, info.schemaString, info.tableComment);
            log.debug("Replaced table: '%s'", new Object[]{schemaTableName});
        }
        catch (TableNotFoundException e) {
            log.debug("Table disappeared during metadata updating operation: '%s'", new Object[]{schemaTableName});
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to store table metadata for '%s'", new Object[]{schemaTableName});
            this.failedCounts.incrementAndGet();
        }
    }

    private void checkFailedTasks() {
        if (this.failedCounts.get() > 10) {
            log.warn("Too many failed tasks, stopping the scheduler");
            this.stop();
        }
    }

    @VisibleForTesting
    public void clear() {
        this.updateInfos.clear();
    }

    @PreDestroy
    public void stop() {
        if (this.enabled) {
            this.scheduler.shutdownNow();
            this.executor.shutdownNow();
        }
    }

    public static boolean isSameTransactionVersion(Table table, TableSnapshot snapshot) {
        return DeltaLakeTableMetadataScheduler.getLastTransactionVersion(table).map(version -> version.longValue() == snapshot.getVersion()).orElse(false);
    }

    public static Optional<Long> getLastTransactionVersion(Table table) {
        String version = (String)table.getParameters().get(TRINO_LAST_TRANSACTION_VERSION);
        return Optional.ofNullable(version).map(Long::parseLong);
    }

    public static boolean containsSchemaString(Table table) {
        return table.getParameters().containsKey(TRINO_METADATA_SCHEMA_STRING);
    }

    public List<ColumnMetadata> getColumnsMetadata(Table table) {
        String schemaString = (String)table.getParameters().get(TRINO_METADATA_SCHEMA_STRING);
        return (List)DeltaLakeSchemaSupport.getColumnMetadata(schemaString, this.typeManager, DeltaLakeSchemaSupport.ColumnMappingMode.NONE, (List<String>)ImmutableList.of()).stream().map(DeltaLakeColumnMetadata::columnMetadata).collect(ImmutableList.toImmutableList());
    }

    public boolean canStoreTableMetadata(ConnectorSession session, String schemaString, Optional<String> tableComment) {
        return DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled(session) && schemaString.length() <= this.tableParameterLengthLimit && tableComment.map(String::length).orElse(0) <= this.tableParameterLengthLimit;
    }

    public static Map<String, String> tableMetadataParameters(long version, String schemaString, Optional<String> tableComment) {
        ImmutableMap.Builder parameters = ImmutableMap.builder();
        tableComment.ifPresent(comment -> parameters.put((Object)"comment", comment));
        parameters.put((Object)TRINO_LAST_TRANSACTION_VERSION, (Object)Long.toString(version));
        parameters.put((Object)TRINO_METADATA_SCHEMA_STRING, (Object)schemaString);
        return parameters.buildOrThrow();
    }

    public record TableUpdateInfo(ConnectorSession session, long version, String schemaString, Optional<String> tableComment) {
        public TableUpdateInfo {
            Objects.requireNonNull(session, "session is null");
            Objects.requireNonNull(schemaString, "schemaString is null");
            Objects.requireNonNull(tableComment, "tableComment is null");
        }
    }
}

