/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.plugin.deltalake.AnalyzeHandle;
import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeColumnType;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeMetadata;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeSplit;
import io.trino.plugin.deltalake.DeltaLakeSplitSource;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.DeltaLakeTransactionManager;
import io.trino.plugin.deltalake.ForDeltaLakeSplitManager;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesSplitSource;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.deltalake.util.DeltaLakeDomains;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import java.net.URI;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class DeltaLakeSplitManager
implements ConnectorSplitManager {
    private final TypeManager typeManager;
    private final TransactionLogAccess transactionLogAccess;
    private final ExecutorService executor;
    private final int maxSplitsPerSecond;
    private final int maxOutstandingSplits;
    private final double minimumAssignedSplitWeight;
    private final TrinoFileSystemFactory fileSystemFactory;
    private final DeltaLakeTransactionManager deltaLakeTransactionManager;
    private final CachingHostAddressProvider cachingHostAddressProvider;

    @Inject
    public DeltaLakeSplitManager(TypeManager typeManager, TransactionLogAccess transactionLogAccess, @ForDeltaLakeSplitManager ExecutorService executor, DeltaLakeConfig config, TrinoFileSystemFactory fileSystemFactory, DeltaLakeTransactionManager deltaLakeTransactionManager, CachingHostAddressProvider cachingHostAddressProvider) {
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
        this.transactionLogAccess = Objects.requireNonNull(transactionLogAccess, "transactionLogAccess is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.maxSplitsPerSecond = config.getMaxSplitsPerSecond();
        this.maxOutstandingSplits = config.getMaxOutstandingSplits();
        this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight();
        this.fileSystemFactory = Objects.requireNonNull(fileSystemFactory, "fileSystemFactory is null");
        this.deltaLakeTransactionManager = Objects.requireNonNull(deltaLakeTransactionManager, "deltaLakeTransactionManager is null");
        this.cachingHostAddressProvider = Objects.requireNonNull(cachingHostAddressProvider, "cacheHostAddressProvider is null");
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle handle, DynamicFilter dynamicFilter, Constraint constraint) {
        DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle)handle;
        if (deltaLakeTableHandle.getEnforcedPartitionConstraint().isNone() || deltaLakeTableHandle.getNonPartitionConstraint().isNone()) {
            if (deltaLakeTableHandle.isRecordScannedFiles()) {
                return new FixedSplitSource((Iterable)ImmutableList.of(), (List)ImmutableList.of());
            }
            return FixedSplitSource.emptySplitSource();
        }
        DeltaLakeSplitSource splitSource = new DeltaLakeSplitSource(deltaLakeTableHandle.getSchemaTableName(), this.getSplits(transaction, deltaLakeTableHandle, session, deltaLakeTableHandle.getMaxScannedFileSize(), dynamicFilter.getColumnsCovered(), constraint), this.executor, this.maxSplitsPerSecond, this.maxOutstandingSplits, dynamicFilter, DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout(session), deltaLakeTableHandle.isRecordScannedFiles());
        return new ClassLoaderSafeConnectorSplitSource((ConnectorSplitSource)splitSource, DeltaLakeSplitManager.class.getClassLoader());
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function) {
        if (function instanceof TableChangesTableFunctionHandle) {
            TableChangesTableFunctionHandle tableFunctionHandle = (TableChangesTableFunctionHandle)function;
            return new TableChangesSplitSource(session, this.fileSystemFactory, tableFunctionHandle);
        }
        throw new UnsupportedOperationException("Unrecognized function: " + String.valueOf(function));
    }

    private Stream<DeltaLakeSplit> getSplits(ConnectorTransactionHandle transaction, DeltaLakeTableHandle tableHandle, ConnectorSession session, Optional<DataSize> maxScannedFileSize, Set<ColumnHandle> columnsCoveredByDynamicFilter, Constraint constraint) {
        TableSnapshot tableSnapshot = this.deltaLakeTransactionManager.get(transaction, session.getIdentity()).getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), Optional.of(tableHandle.getReadVersion()));
        Stream<AddFileEntry> validDataFiles = this.transactionLogAccess.getActiveFiles(session, tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), tableHandle.getEnforcedPartitionConstraint(), tableHandle.getProjectedColumns().orElse((Set<DeltaLakeColumnHandle>)ImmutableSet.of()));
        TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
        TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
        Domain pathDomain = DeltaLakeDomains.getPathDomain(nonPartitionConstraint);
        Domain fileModifiedDomain = DeltaLakeDomains.getFileModifiedTimeDomain(nonPartitionConstraint);
        Domain fileSizeDomain = DeltaLakeDomains.getFileSizeDomain(nonPartitionConstraint);
        boolean splittable = tableHandle.getWriteType().isEmpty() && DeltaLakeSplitManager.mayAnyDataColumnProjected(tableHandle);
        Optional filesModifiedAfter = tableHandle.getAnalyzeHandle().flatMap(AnalyzeHandle::filesModifiedAfter);
        Optional<Long> maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes);
        MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
        boolean isOptimize = tableHandle.isOptimize();
        if (isOptimize) {
            Preconditions.checkArgument((boolean)maxScannedFileSizeInBytes.isPresent(), (Object)"maxScannedFileSizeInBytes must be provided when performing OPTIMIZE");
            validDataFiles = DeltaLakeSplitManager.filterValidDataFilesForOptimize(validDataFiles, maxScannedFileSizeInBytes.get());
        }
        Set predicatedColumnNames = (Set)Stream.concat(((Map)nonPartitionConstraint.getDomains().orElseThrow()).keySet().stream(), columnsCoveredByDynamicFilter.stream().map(DeltaLakeColumnHandle.class::cast)).map(DeltaLakeColumnHandle::baseColumnName).collect(ImmutableSet.toImmutableSet());
        List<DeltaLakeColumnMetadata> schema = DeltaLakeSchemaSupport.extractSchema(metadataEntry, tableHandle.getProtocolEntry(), this.typeManager);
        List predicatedColumns = (List)schema.stream().filter(column -> predicatedColumnNames.contains(column.name())).collect(ImmutableList.toImmutableList());
        return validDataFiles.flatMap(addAction -> {
            if (tableHandle.getAnalyzeHandle().isPresent() && tableHandle.getAnalyzeHandle().get().analyzeMode() != DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH && !addAction.isDataChange()) {
                return Stream.empty();
            }
            String splitPath = DeltaLakeSplitManager.buildSplitPath(Location.of((String)tableHandle.getLocation()), addAction).toString();
            if (!DeltaLakeDomains.pathMatchesPredicate(pathDomain, splitPath)) {
                return Stream.empty();
            }
            if (filesModifiedAfter.isPresent() && addAction.getModificationTime() <= ((Instant)filesModifiedAfter.get()).toEpochMilli()) {
                return Stream.empty();
            }
            if (!DeltaLakeDomains.fileModifiedTimeMatchesPredicate(fileModifiedDomain, addAction.getModificationTime())) {
                return Stream.empty();
            }
            if (addAction.getDeletionVector().isEmpty() && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() > (Long)maxScannedFileSizeInBytes.get()) {
                return Stream.empty();
            }
            if (!DeltaLakeDomains.fileSizeMatchesPredicate(fileSizeDomain, addAction.getSize())) {
                return Stream.empty();
            }
            Map enforcedDomains = (Map)enforcedPartitionConstraint.getDomains().orElseThrow();
            if (!DeltaLakeDomains.partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains)) {
                return Stream.empty();
            }
            TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = DeltaLakeMetadata.createStatisticsPredicate(addAction, predicatedColumns, metadataEntry.getLowercasePartitionColumns());
            if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
                return Stream.empty();
            }
            if (constraint.predicate().isPresent()) {
                Map<String, Optional<String>> partitionValues = addAction.getCanonicalPartitionValues();
                Map deserializedValues = (Map)((Set)constraint.getPredicateColumns().orElseThrow()).stream().map(DeltaLakeColumnHandle.class::cast).filter(column -> column.isBaseColumn() && partitionValues.containsKey(column.baseColumnName())).collect(ImmutableMap.toImmutableMap(Function.identity(), column -> new NullableValue(column.baseType(), TransactionLogParser.deserializePartitionValue(column, (Optional)partitionValues.get(column.baseColumnName())))));
                if (!((Predicate)constraint.predicate().get()).test(deserializedValues)) {
                    return Stream.empty();
                }
            }
            return this.splitsForFile(session, (AddFileEntry)addAction, splitPath, addAction.getCanonicalPartitionValues(), statisticsPredicate, splittable).stream();
        });
    }

    private static Stream<AddFileEntry> filterValidDataFilesForOptimize(Stream<AddFileEntry> validDataFiles, long maxScannedFileSizeInBytes) {
        HashMap pendingAddFileEntriesMap = new HashMap();
        return validDataFiles.filter(addFileEntry -> addFileEntry.getSize() < maxScannedFileSizeInBytes).flatMap(addFileEntry -> {
            Map<String, Optional<String>> canonicalPartitionValues = addFileEntry.getCanonicalPartitionValues();
            if (pendingAddFileEntriesMap.containsKey(canonicalPartitionValues)) {
                Optional alreadyQueuedAddFileEntry = (Optional)pendingAddFileEntriesMap.get(canonicalPartitionValues);
                if (alreadyQueuedAddFileEntry.isEmpty()) {
                    return Stream.of(addFileEntry);
                }
                pendingAddFileEntriesMap.put(canonicalPartitionValues, Optional.empty());
                return Stream.of((AddFileEntry)alreadyQueuedAddFileEntry.get(), addFileEntry);
            }
            pendingAddFileEntriesMap.put(canonicalPartitionValues, Optional.of(addFileEntry));
            return Stream.empty();
        });
    }

    private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandle) {
        if (tableHandle.getProjectedColumns().isEmpty()) {
            return true;
        }
        return tableHandle.getProjectedColumns().get().stream().map(DeltaLakeColumnHandle::columnType).anyMatch(DeltaLakeColumnType.REGULAR::equals);
    }

    private List<DeltaLakeSplit> splitsForFile(ConnectorSession session, AddFileEntry addFileEntry, String splitPath, Map<String, Optional<String>> partitionKeys, TupleDomain<DeltaLakeColumnHandle> statisticsPredicate, boolean splittable) {
        long splitSize;
        long fileSize = addFileEntry.getSize();
        if (!splittable) {
            return ImmutableList.of((Object)new DeltaLakeSplit(splitPath, 0L, fileSize, fileSize, addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), this.cachingHostAddressProvider.getHosts(splitPath, (List)ImmutableList.of()), SplitWeight.standard(), statisticsPredicate, partitionKeys));
        }
        ImmutableList.Builder splits = ImmutableList.builder();
        for (long currentOffset = 0L; currentOffset < fileSize; currentOffset += splitSize) {
            long maxSplitSize = DeltaLakeSessionProperties.getMaxSplitSize(session).toBytes();
            splitSize = Math.min(maxSplitSize, fileSize - currentOffset);
            splits.add((Object)new DeltaLakeSplit(splitPath, currentOffset, splitSize, fileSize, Optional.empty(), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), this.cachingHostAddressProvider.getHosts(splitPath, (List)ImmutableList.of()), SplitWeight.fromProportion((double)Math.clamp((double)splitSize / (double)maxSplitSize, this.minimumAssignedSplitWeight, 1.0)), statisticsPredicate, partitionKeys));
        }
        return splits.build();
    }

    public static Location buildSplitPath(Location tableLocation, AddFileEntry addAction) {
        URI uri = URI.create(addAction.getPath());
        if (uri.isAbsolute()) {
            return Location.of((String)(uri.getScheme() + ":" + uri.getSchemeSpecificPart()));
        }
        return tableLocation.appendPath(uri.getPath());
    }
}

