/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.jmx.CacheStatsMBean;
import io.airlift.log.Logger;
import io.airlift.slice.SizeOf;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CanonicalColumnName;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeDataFileCacheEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogUtil;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarbinaryType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class TransactionLogAccess {
    private static final Logger log = Logger.get(TransactionLogAccess.class);
    private final TypeManager typeManager;
    private final CheckpointSchemaManager checkpointSchemaManager;
    private final FileFormatDataSourceStats fileFormatDataSourceStats;
    private final HdfsEnvironment hdfsEnvironment;
    private final ParquetReaderOptions parquetReaderOptions;
    private final Cache<String, TableSnapshot> tableSnapshots;
    private final Cache<String, DeltaLakeDataFileCacheEntry> activeDataFileCache;
    private final boolean checkpointRowStatisticsWritingEnabled;

    @Inject
    public TransactionLogAccess(TypeManager typeManager, CheckpointSchemaManager checkpointSchemaManager, DeltaLakeConfig deltaLakeConfig, FileFormatDataSourceStats fileFormatDataSourceStats, HdfsEnvironment hdfsEnvironment, ParquetReaderConfig parquetReaderConfig) {
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
        this.checkpointSchemaManager = Objects.requireNonNull(checkpointSchemaManager, "checkpointSchemaManager is null");
        this.fileFormatDataSourceStats = Objects.requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
        this.hdfsEnvironment = Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.parquetReaderOptions = Objects.requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
        Objects.requireNonNull(deltaLakeConfig, "deltaLakeConfig is null");
        this.checkpointRowStatisticsWritingEnabled = deltaLakeConfig.isCheckpointRowStatisticsWritingEnabled();
        this.tableSnapshots = EvictableCacheBuilder.newBuilder().expireAfterWrite(deltaLakeConfig.getMetadataCacheTtl().toMillis(), TimeUnit.MILLISECONDS).recordStats().build();
        this.activeDataFileCache = EvictableCacheBuilder.newBuilder().weigher((key, value) -> Ints.saturatedCast((long)(SizeOf.estimatedSizeOf((String)key) + value.getRetainedSizeInBytes()))).maximumWeight(deltaLakeConfig.getDataFileCacheSize().toBytes()).recordStats().build();
    }

    @Managed
    @Nested
    public CacheStatsMBean getDataFileMetadataCacheStats() {
        return new CacheStatsMBean(this.activeDataFileCache);
    }

    @Managed
    @Nested
    public CacheStatsMBean getMetadataCacheStats() {
        return new CacheStatsMBean(this.tableSnapshots);
    }

    public TableSnapshot loadSnapshot(SchemaTableName table, Path tableLocation, ConnectorSession session) throws IOException {
        TableSnapshot snapshot;
        String location = tableLocation.toString();
        TableSnapshot cachedSnapshot = (TableSnapshot)this.tableSnapshots.getIfPresent((Object)location);
        FileSystem fileSystem = this.getFileSystem(tableLocation, table, session);
        if (cachedSnapshot == null) {
            try {
                snapshot = (TableSnapshot)this.tableSnapshots.get((Object)location, () -> TableSnapshot.load(table, fileSystem, tableLocation, this.parquetReaderOptions, this.checkpointRowStatisticsWritingEnabled));
            }
            catch (UncheckedExecutionException | ExecutionException e) {
                Throwables.throwIfUnchecked((Throwable)e.getCause());
                throw new RuntimeException(e);
            }
        } else {
            Optional<TableSnapshot> updatedSnapshot = cachedSnapshot.getUpdatedSnapshot(fileSystem);
            if (updatedSnapshot.isPresent()) {
                snapshot = updatedSnapshot.get();
                this.tableSnapshots.asMap().replace(location, cachedSnapshot, snapshot);
            } else {
                snapshot = cachedSnapshot;
            }
        }
        return snapshot;
    }

    public void invalidateCaches(String tableLocation) {
        this.tableSnapshots.invalidate((Object)tableLocation);
        this.activeDataFileCache.invalidate((Object)tableLocation);
    }

    public Optional<MetadataEntry> getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSession session) {
        FileSystem fileSystem = this.getFileSystem(tableSnapshot, session);
        if (tableSnapshot.getCachedMetadata().isEmpty()) {
            try (Stream metadataEntries = this.getEntries(tableSnapshot, CheckpointEntryIterator.EntryType.METADATA, (Stream<DeltaLakeTransactionLogEntry> entryStream) -> entryStream.map(DeltaLakeTransactionLogEntry::getMetaData).filter(Objects::nonNull), session, fileSystem, this.hdfsEnvironment, this.fileFormatDataSourceStats);){
                tableSnapshot.setCachedMetadata(metadataEntries.reduce((first, second) -> second));
            }
        }
        return tableSnapshot.getCachedMetadata();
    }

    public List<AddFileEntry> getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) {
        try {
            String tableLocation = tableSnapshot.getTableLocation().toString();
            FileSystem fileSystem = this.getFileSystem(tableSnapshot, session);
            DeltaLakeDataFileCacheEntry cachedTable = (DeltaLakeDataFileCacheEntry)this.activeDataFileCache.get((Object)tableLocation, () -> {
                List<AddFileEntry> activeFiles = this.loadActiveFiles(tableSnapshot, session, fileSystem);
                return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
            });
            if (cachedTable.getVersion() > tableSnapshot.getVersion()) {
                log.warn("Query run with outdated Transaction Log Snapshot, retrieved stale table entries for table: %s and query %s", new Object[]{tableSnapshot.getTable(), session.getQueryId()});
                return this.loadActiveFiles(tableSnapshot, session, fileSystem);
            }
            if (cachedTable.getVersion() < tableSnapshot.getVersion()) {
                List<DeltaLakeTransactionLogEntry> newEntries = this.getJsonEntries(cachedTable.getVersion(), tableSnapshot.getVersion(), tableSnapshot, fileSystem);
                DeltaLakeDataFileCacheEntry updatedCacheEntry = cachedTable.withUpdatesApplied(newEntries, tableSnapshot.getVersion());
                this.activeDataFileCache.asMap().replace(tableLocation, cachedTable, updatedCacheEntry);
                cachedTable = updatedCacheEntry;
            }
            return cachedTable.getActiveFiles();
        }
        catch (UncheckedExecutionException | IOException | ExecutionException e) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + tableSnapshot.getTable(), e);
        }
    }

    private List<AddFileEntry> loadActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session, FileSystem fileSystem) {
        try (Stream entries = this.getEntries(tableSnapshot, (Set<CheckpointEntryIterator.EntryType>)ImmutableSet.of((Object)((Object)CheckpointEntryIterator.EntryType.ADD)), this::activeAddEntries, session, fileSystem, this.hdfsEnvironment, this.fileFormatDataSourceStats);){
            List activeFiles;
            List list = activeFiles = (List)entries.collect(ImmutableList.toImmutableList());
            return list;
        }
    }

    public static List<ColumnMetadata> columnsWithStats(MetadataEntry metadataEntry, TypeManager typeManager) {
        return TransactionLogAccess.columnsWithStats(DeltaLakeSchemaSupport.extractSchema(metadataEntry, typeManager), metadataEntry.getCanonicalPartitionColumns());
    }

    public static ImmutableList<ColumnMetadata> columnsWithStats(List<ColumnMetadata> schema, List<String> partitionColumns) {
        return (ImmutableList)schema.stream().filter(column -> !partitionColumns.contains(column.getName())).filter(column -> {
            Type type = column.getType();
            return !(type instanceof MapType) && !(type instanceof ArrayType) && !type.equals(BooleanType.BOOLEAN) && !type.equals(VarbinaryType.VARBINARY);
        }).collect(ImmutableList.toImmutableList());
    }

    private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, Stream<DeltaLakeTransactionLogEntry> jsonEntries) {
        LinkedHashMap activeJsonEntries = new LinkedHashMap();
        HashSet removedFiles = new HashSet();
        jsonEntries.forEach(deltaLakeTransactionLogEntry -> {
            RemoveFileEntry removeEntry;
            AddFileEntry addEntry = deltaLakeTransactionLogEntry.getAdd();
            if (addEntry != null) {
                activeJsonEntries.put(addEntry.getPath(), addEntry);
            }
            if ((removeEntry = deltaLakeTransactionLogEntry.getRemove()) != null) {
                activeJsonEntries.remove(removeEntry.getPath());
                removedFiles.add(removeEntry.getPath());
            }
        });
        Stream<AddFileEntry> filteredCheckpointEntries = checkpointEntries.map(DeltaLakeTransactionLogEntry::getAdd).filter(Objects::nonNull).filter(addEntry -> !removedFiles.contains(addEntry.getPath()) && !activeJsonEntries.containsKey(addEntry.getPath()));
        return Stream.concat(filteredCheckpointEntries, activeJsonEntries.values().stream());
    }

    public Stream<RemoveFileEntry> getRemoveEntries(TableSnapshot tableSnapshot, ConnectorSession session) {
        return this.getEntries(tableSnapshot, CheckpointEntryIterator.EntryType.REMOVE, (Stream<DeltaLakeTransactionLogEntry> entryStream) -> entryStream.map(DeltaLakeTransactionLogEntry::getRemove).filter(Objects::nonNull), session, this.getFileSystem(tableSnapshot, session), this.hdfsEnvironment, this.fileFormatDataSourceStats);
    }

    public Stream<ProtocolEntry> getProtocolEntries(TableSnapshot tableSnapshot, ConnectorSession session) {
        return this.getEntries(tableSnapshot, CheckpointEntryIterator.EntryType.PROTOCOL, (Stream<DeltaLakeTransactionLogEntry> entryStream) -> entryStream.map(DeltaLakeTransactionLogEntry::getProtocol).filter(Objects::nonNull), session, this.getFileSystem(tableSnapshot, session), this.hdfsEnvironment, this.fileFormatDataSourceStats);
    }

    public Stream<CommitInfoEntry> getCommitInfoEntries(TableSnapshot tableSnapshot, ConnectorSession session) {
        return this.getEntries(tableSnapshot, CheckpointEntryIterator.EntryType.COMMIT, (Stream<DeltaLakeTransactionLogEntry> entryStream) -> entryStream.map(DeltaLakeTransactionLogEntry::getCommitInfo).filter(Objects::nonNull), session, this.getFileSystem(tableSnapshot, session), this.hdfsEnvironment, this.fileFormatDataSourceStats);
    }

    private <T> Stream<T> getEntries(TableSnapshot tableSnapshot, Set<CheckpointEntryIterator.EntryType> entryTypes, BiFunction<Stream<DeltaLakeTransactionLogEntry>, Stream<DeltaLakeTransactionLogEntry>, Stream<T>> entryMapper, ConnectorSession session, FileSystem fileSystem, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats) {
        try {
            Stream jsonEntries = tableSnapshot.getJsonTransactionLogEntries().stream();
            Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(session, entryTypes, this.checkpointSchemaManager, this.typeManager, fileSystem, hdfsEnvironment, stats);
            return entryMapper.apply(checkpointEntries, jsonEntries);
        }
        catch (IOException e) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), (Throwable)e);
        }
    }

    private <T> Stream<T> getEntries(TableSnapshot tableSnapshot, CheckpointEntryIterator.EntryType entryType, Function<Stream<DeltaLakeTransactionLogEntry>, Stream<T>> entryMapper, ConnectorSession session, FileSystem fileSystem, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats) {
        return this.getEntries(tableSnapshot, (Set<CheckpointEntryIterator.EntryType>)ImmutableSet.of((Object)((Object)entryType)), (Stream<DeltaLakeTransactionLogEntry> checkpointStream, Stream<DeltaLakeTransactionLogEntry> jsonStream) -> (Stream)entryMapper.apply(Stream.concat(checkpointStream, jsonStream)), session, fileSystem, hdfsEnvironment, stats);
    }

    public Stream<DeltaLakeTransactionLogEntry> getJsonEntries(FileSystem fileSystem, Path transactionLogDir, List<Long> forVersions) {
        return forVersions.stream().flatMap(version -> {
            try {
                Optional<List<DeltaLakeTransactionLogEntry>> entriesFromJson = TransactionLogTail.getEntriesFromJson(TransactionLogUtil.getTransactionLogJsonEntryPath(transactionLogDir, version), fileSystem);
                return entriesFromJson.map(Collection::stream).orElseGet(() -> Stream.of(new DeltaLakeTransactionLogEntry[0]));
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
    }

    public List<Long> getPastTableVersions(FileSystem fileSystem, Path transactionLogDir, Instant startAt, long lastVersion) {
        ImmutableList.Builder result = ImmutableList.builder();
        for (long version = lastVersion; version >= 0L; --version) {
            FileStatus fileStatus;
            try {
                fileStatus = fileSystem.getFileStatus(TransactionLogUtil.getTransactionLogJsonEntryPath(transactionLogDir, version));
            }
            catch (IOException e) {
                if (TransactionLogTail.isFileNotFoundException(e)) {
                    return null;
                }
                throw new UncheckedIOException(e);
            }
            if (fileStatus.getModificationTime() < startAt.toEpochMilli()) break;
            result.add((Object)version);
        }
        return result.build();
    }

    private List<DeltaLakeTransactionLogEntry> getJsonEntries(long startVersion, long endVersion, TableSnapshot tableSnapshot, FileSystem fileSystem) throws IOException {
        Optional<Long> lastCheckpointVersion = tableSnapshot.getLastCheckpointVersion();
        if (lastCheckpointVersion.isPresent() && startVersion < lastCheckpointVersion.get()) {
            return ImmutableList.builder().addAll(TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), lastCheckpointVersion).getFileEntries()).addAll(tableSnapshot.getJsonTransactionLogEntries()).build();
        }
        return TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), Optional.of(endVersion)).getFileEntries();
    }

    private FileSystem getFileSystem(TableSnapshot tableSnapshot, ConnectorSession session) {
        return this.getFileSystem(tableSnapshot.getTableLocation(), tableSnapshot.getTable(), session);
    }

    protected FileSystem getFileSystem(Path tableLocation, SchemaTableName table, ConnectorSession session) {
        try {
            return this.hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), tableLocation);
        }
        catch (IOException e) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Failed accessing transaction log for table: " + table, (Throwable)e);
        }
    }

    public static <T> String canonicalizeColumnName(String columnName) {
        return columnName.toLowerCase(Locale.ENGLISH);
    }

    public static <T> Map<CanonicalColumnName, T> toCanonicalNameKeyedMap(Map<String, T> map) {
        return (Map)map.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> new CanonicalColumnName((String)entry.getKey()), Map.Entry::getValue));
    }

    public static <T> Map<CanonicalColumnName, T> toCanonicalNameKeyedMap(Map<String, T> map, Map<String, CanonicalColumnName> canonicalColumnNames) {
        return (Map)map.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> Objects.requireNonNull((CanonicalColumnName)canonicalColumnNames.get(entry.getKey()), String.format("Did not find CanonicalColumnName for %s", entry.getKey())), Map.Entry::getValue));
    }

    @Deprecated
    public static <T> Map<String, T> toOriginalNameKeyedMap(Map<CanonicalColumnName, T> map) {
        return (Map)map.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> ((CanonicalColumnName)entry.getKey()).getOriginalName(), Map.Entry::getValue));
    }
}

