/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.memory.MemoryFileSystemFactory;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.HiveMetastoreFactory;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.deltalake.DataFileInfo;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeMergeResult;
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeSplit;
import io.trino.plugin.deltalake.DeltaLakeSplitManager;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.DeltaLakeTransactionManager;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableOperationsProvider;
import io.trino.plugin.deltalake.metastore.file.DeltaLakeFileMetastoreTableOperationsProvider;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
import io.trino.plugin.deltalake.statistics.ExtendedStatisticsAccess;
import io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
import io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizerManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.plugin.hive.HiveTestUtils;
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.file.TestingFileHiveMetastore;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.spi.NodeManager;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import io.trino.testing.TestingConnectorContext;
import io.trino.testing.TestingConnectorSession;
import io.trino.testing.TestingNodeManager;
import io.trino.type.InternalTypeManager;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestDeltaLakeSplitManager {
    private static final String TABLE_PATH = "/path/to/a/table";
    private static final String FILE_PATH = "directory/file";
    private static final String FULL_PATH = "/path/to/a/table/directory/file";
    private static final MetadataEntry metadataEntry = new MetadataEntry("id", "name", "description", new MetadataEntry.Format("provider", (Map)ImmutableMap.of()), "{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}", (List)ImmutableList.of(), (Map)ImmutableMap.of(), 0L);
    private static final DeltaLakeTableHandle tableHandle = new DeltaLakeTableHandle("schema", "table", true, "/path/to/a/table", metadataEntry, new ProtocolEntry(1, 2, Optional.empty(), Optional.empty()), TupleDomain.all(), TupleDomain.all(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), 0L, false);
    private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true);

    @Test
    public void testAbsolutePathSplits() throws Exception {
        this.testAbsolutePathSplits("file://path/to/file", "file://path/to/file");
        this.testAbsolutePathSplits("abfs://ct@st.dfs.core.windows.net/path/to/file", "abfs://ct@st.dfs.core.windows.net/path/to/file");
        this.testAbsolutePathSplits("hdfs://path/to/file", "hdfs://path/to/file");
        this.testAbsolutePathSplits("s3://my-s3-bucket/path/to//file", "s3://my-s3-bucket/path/to//file");
        this.testAbsolutePathSplits("s3://my-s3-bucket/path/to//file/", "s3://my-s3-bucket/path/to//file/");
        this.testAbsolutePathSplits("gs://my-gcp-bucket/path/to/file", "gs://my-gcp-bucket/path/to/file");
        this.testAbsolutePathSplits("abfs://ct@st.dfs.core.windows.net/+ab+/a%25/a%2525/path/to/file", "abfs://ct@st.dfs.core.windows.net/+ab+/a%/a%25/path/to/file");
    }

    private void testAbsolutePathSplits(String absoluteRawEncodedFilePath, String absoluteDecodedParsedFilePath) throws Exception {
        long fileSize = 20000L;
        ImmutableList addFileEntries = ImmutableList.of((Object)this.addFileEntryOfSize(absoluteRawEncodedFilePath, fileSize));
        DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig().setMaxSplitSize(DataSize.ofBytes((long)5000L));
        double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
        DeltaLakeSplitManager splitManager = this.setupSplitManager((List<AddFileEntry>)addFileEntries, deltaLakeConfig);
        List<DeltaLakeSplit> splits = this.getSplits(splitManager, deltaLakeConfig);
        ImmutableList expected = ImmutableList.of((Object)this.makeSplit(absoluteDecodedParsedFilePath, 0L, 5000L, fileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(absoluteDecodedParsedFilePath, 5000L, 5000L, fileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(absoluteDecodedParsedFilePath, 10000L, 5000L, fileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(absoluteDecodedParsedFilePath, 15000L, 5000L, fileSize, minimumAssignedSplitWeight));
        Assertions.assertThat(splits).isEqualTo((Object)expected);
    }

    @Test
    public void testSplitSizes() throws ExecutionException, InterruptedException {
        long fileSize = 50000L;
        ImmutableList addFileEntries = ImmutableList.of((Object)this.addFileEntryOfSize(FILE_PATH, fileSize));
        DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig().setMaxSplitSize(DataSize.ofBytes((long)20000L));
        double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
        DeltaLakeSplitManager splitManager = this.setupSplitManager((List<AddFileEntry>)addFileEntries, deltaLakeConfig);
        List<DeltaLakeSplit> splits = this.getSplits(splitManager, deltaLakeConfig);
        ImmutableList expected = ImmutableList.of((Object)this.makeSplit(FULL_PATH, 0L, 20000L, fileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(FULL_PATH, 20000L, 20000L, fileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(FULL_PATH, 40000L, 10000L, fileSize, minimumAssignedSplitWeight));
        Assertions.assertThat(splits).isEqualTo((Object)expected);
    }

    @Test
    public void testSplitsFromMultipleFiles() throws ExecutionException, InterruptedException {
        long firstFileSize = 1000L;
        long secondFileSize = 20000L;
        ImmutableList addFileEntries = ImmutableList.of((Object)this.addFileEntryOfSize(FILE_PATH, firstFileSize), (Object)this.addFileEntryOfSize(FILE_PATH, secondFileSize));
        DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig().setMaxSplitSize(DataSize.ofBytes((long)10000L));
        double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
        DeltaLakeSplitManager splitManager = this.setupSplitManager((List<AddFileEntry>)addFileEntries, deltaLakeConfig);
        List<DeltaLakeSplit> splits = this.getSplits(splitManager, deltaLakeConfig);
        ImmutableList expected = ImmutableList.of((Object)this.makeSplit(FULL_PATH, 0L, 1000L, firstFileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(FULL_PATH, 0L, 10000L, secondFileSize, minimumAssignedSplitWeight), (Object)this.makeSplit(FULL_PATH, 10000L, 10000L, secondFileSize, minimumAssignedSplitWeight));
        Assertions.assertThat(splits).isEqualTo((Object)expected);
    }

    private DeltaLakeSplitManager setupSplitManager(final List<AddFileEntry> addFileEntries, DeltaLakeConfig deltaLakeConfig) {
        TestingConnectorContext context = new TestingConnectorContext();
        TypeManager typeManager = context.getTypeManager();
        HdfsFileSystemFactory hdfsFileSystemFactory = new HdfsFileSystemFactory(HiveTestUtils.HDFS_ENVIRONMENT, HiveTestUtils.HDFS_FILE_SYSTEM_STATS);
        TransactionLogAccess transactionLogAccess = new TransactionLogAccess(this, typeManager, new CheckpointSchemaManager(typeManager), deltaLakeConfig, new FileFormatDataSourceStats(), (TrinoFileSystemFactory)hdfsFileSystemFactory, new ParquetReaderConfig(), (ExecutorService)MoreExecutors.newDirectExecutorService()){

            public Stream<AddFileEntry> getActiveFiles(ConnectorSession session, TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TupleDomain<DeltaLakeColumnHandle> partitionConstraint, Set<DeltaLakeColumnHandle> projectedColumns) {
                return addFileEntries.stream();
            }
        };
        CheckpointWriterManager checkpointWriterManager = new CheckpointWriterManager(typeManager, new CheckpointSchemaManager(typeManager), (TrinoFileSystemFactory)hdfsFileSystemFactory, new NodeVersion("test_version"), transactionLogAccess, new FileFormatDataSourceStats(), JsonCodec.jsonCodec(LastCheckpoint.class), new DeltaLakeConfig(), (ExecutorService)MoreExecutors.newDirectExecutorService());
        HiveMetastoreFactory hiveMetastoreFactory = HiveMetastoreFactory.ofInstance((HiveMetastore)TestingFileHiveMetastore.createTestingFileHiveMetastore((TrinoFileSystemFactory)new MemoryFileSystemFactory(), (Location)Location.of((String)"memory:///")));
        DeltaLakeMetadataFactory metadataFactory = new DeltaLakeMetadataFactory(hiveMetastoreFactory, (TrinoFileSystemFactory)hdfsFileSystemFactory, transactionLogAccess, typeManager, new DeltaLakeConfig(), JsonCodec.jsonCodec(DataFileInfo.class), JsonCodec.jsonCodec(DeltaLakeMergeResult.class), new TransactionLogWriterFactory(new TransactionLogSynchronizerManager((Map)ImmutableMap.of(), new NoIsolationSynchronizer((TrinoFileSystemFactory)hdfsFileSystemFactory))), (NodeManager)new TestingNodeManager(), checkpointWriterManager, new CachingExtendedStatisticsAccess((ExtendedStatisticsAccess)new MetaDirStatisticsAccess((TrinoFileSystemFactory)HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY, new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))), true, false, new NodeVersion("test_version"), new DeltaLakeTableMetadataScheduler((NodeManager)new TestingNodeManager(), InternalTypeManager.TESTING_TYPE_MANAGER, (DeltaLakeTableOperationsProvider)new DeltaLakeFileMetastoreTableOperationsProvider(hiveMetastoreFactory), Integer.MAX_VALUE, new DeltaLakeConfig()), (ExecutorService)MoreExecutors.newDirectExecutorService());
        ConnectorSession session = this.testingConnectorSessionWithConfig(deltaLakeConfig);
        DeltaLakeTransactionManager deltaLakeTransactionManager = new DeltaLakeTransactionManager(metadataFactory);
        deltaLakeTransactionManager.begin((ConnectorTransactionHandle)this.transactionHandle);
        deltaLakeTransactionManager.get((ConnectorTransactionHandle)this.transactionHandle, session.getIdentity()).getSnapshot(session, tableHandle.getSchemaTableName(), TABLE_PATH, Optional.empty());
        return new DeltaLakeSplitManager(typeManager, transactionLogAccess, (ExecutorService)MoreExecutors.newDirectExecutorService(), deltaLakeConfig, (TrinoFileSystemFactory)HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY, deltaLakeTransactionManager, (CachingHostAddressProvider)new DefaultCachingHostAddressProvider());
    }

    private AddFileEntry addFileEntryOfSize(String path, long fileSize) {
        return new AddFileEntry(path, (Map)ImmutableMap.of(), fileSize, 0L, false, Optional.empty(), Optional.empty(), (Map)ImmutableMap.of(), Optional.empty());
    }

    private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) {
        SplitWeight splitWeight = SplitWeight.fromProportion((double)Math.clamp((double)fileSize / (double)splitSize, minimumAssignedSplitWeight, 1.0));
        return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0L, Optional.empty(), splitWeight, TupleDomain.all(), (Map)ImmutableMap.of());
    }

    private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) throws ExecutionException, InterruptedException {
        ConnectorSplitSource splitSource = splitManager.getSplits((ConnectorTransactionHandle)this.transactionHandle, this.testingConnectorSessionWithConfig(deltaLakeConfig), (ConnectorTableHandle)tableHandle, DynamicFilter.EMPTY, Constraint.alwaysTrue());
        ImmutableList.Builder splits = ImmutableList.builder();
        while (!splitSource.isFinished()) {
            List nextBatch = ((ConnectorSplitSource.ConnectorSplitBatch)splitSource.getNextBatch(10).get()).getSplits();
            splits.addAll((Iterable)nextBatch.stream().map(split -> (DeltaLakeSplit)split).collect(Collectors.toList()));
        }
        return splits.build();
    }

    private ConnectorSession testingConnectorSessionWithConfig(DeltaLakeConfig deltaLakeConfig) {
        DeltaLakeSessionProperties sessionProperties = new DeltaLakeSessionProperties(deltaLakeConfig, new ParquetReaderConfig(), new ParquetWriterConfig());
        return TestingConnectorSession.builder().setPropertyMetadata(sessionProperties.getSessionProperties()).build();
    }
}

