/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.DynamicPartitionLevelLoader;
import org.apache.paimon.flink.lookup.PartitionLoader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class DynamicPartitionLevelLoaderTest {
    @TempDir
    private java.nio.file.Path tempDir;
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private Path tablePath;
    private FileStoreTable table;

    @BeforeEach
    public void before() throws Exception {
        this.tablePath = new Path(this.tempDir.toString());
    }

    @Test
    public void testGetMaxPartitions() throws Exception {
        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
        List<String> primaryKeys = Arrays.asList("pt1", "pt2", "pt3", "k");
        this.table = this.createFileStoreTable(partitionKeys, primaryKeys, Collections.emptyMap());
        TableWriteImpl write = this.table.newWrite(this.commitUser);
        TableCommitImpl commit = this.table.newCommit(this.commitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 16, 2, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2024"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 2, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 16, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2024"), 16, 1, 1, 1L}));
        commit.commit(1L, write.prepareCommit(true, 1L));
        HashMap<String, String> customOptions = new HashMap<String, String>();
        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), "pt1=max_pt()");
        this.table = this.table.copy(customOptions);
        DynamicPartitionLevelLoader partitionLoader = (DynamicPartitionLevelLoader)PartitionLoader.of((FileStoreTable)this.table);
        partitionLoader.open();
        List partitions = partitionLoader.getMaxPartitions();
        Assertions.assertThat((int)partitions.size()).isEqualTo(4);
        Assertions.assertThat(this.partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/16/2", "2025/16/1", "2025/15/2", "2025/15/1"));
        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), "pt1=max_pt(),pt2=max_pt()");
        this.table = this.table.copy(customOptions);
        partitionLoader = (DynamicPartitionLevelLoader)PartitionLoader.of((FileStoreTable)this.table);
        partitionLoader.open();
        partitions = partitionLoader.getMaxPartitions();
        Assertions.assertThat((int)partitions.size()).isEqualTo(2);
        Assertions.assertThat(this.partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/16/2", "2025/16/1"));
        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), "pt1=max_pt(),pt2=max_pt(),pt3=max_pt()");
        this.table = this.table.copy(customOptions);
        partitionLoader = (DynamicPartitionLevelLoader)PartitionLoader.of((FileStoreTable)this.table);
        partitionLoader.open();
        partitions = partitionLoader.getMaxPartitions();
        Assertions.assertThat((int)partitions.size()).isEqualTo(1);
        Assertions.assertThat(this.partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/16/2"));
        write.close();
        commit.close();
    }

    @Test
    public void testGetMaxPartitionsWhenNullPartition() throws Exception {
        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
        this.table = this.createFileStoreTable(partitionKeys, Collections.emptyList(), Collections.emptyMap());
        TableWriteImpl write = this.table.newWrite(this.commitUser);
        TableCommitImpl commit = this.table.newCommit(this.commitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 2, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, null, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), null, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2024"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{null, 16, 1, 1, 1L}));
        commit.commit(1L, write.prepareCommit(true, 1L));
        HashMap<String, String> customOptions = new HashMap<String, String>();
        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), "pt1=max_pt(),pt2=max_pt()");
        this.table = this.table.copy(customOptions);
        DynamicPartitionLevelLoader partitionLoader = (DynamicPartitionLevelLoader)PartitionLoader.of((FileStoreTable)this.table);
        partitionLoader.open();
        List partitions = partitionLoader.getMaxPartitions();
        Assertions.assertThat((int)partitions.size()).isEqualTo(3);
        Assertions.assertThat(this.partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/15/2", "2025/15/1", "2025/15/null"));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2026"), null, null, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2026"), null, 1, 1, 1L}));
        commit.commit(2L, write.prepareCommit(true, 2L));
        partitionLoader = (DynamicPartitionLevelLoader)PartitionLoader.of((FileStoreTable)this.table);
        partitionLoader.open();
        partitions = partitionLoader.getMaxPartitions();
        Assertions.assertThat((int)partitions.size()).isEqualTo(2);
        Assertions.assertThat(this.partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2026/null/1", "2026/null/null"));
        write.close();
        commit.close();
    }

    @Test
    public void testWrongConfig() throws Exception {
        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
        this.table = this.createFileStoreTable(partitionKeys, Collections.emptyList(), Collections.emptyMap());
        TableWriteImpl write = this.table.newWrite(this.commitUser);
        TableCommitImpl commit = this.table.newCommit(this.commitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, 2, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), 15, null, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025"), null, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2024"), 15, 1, 1, 1L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{null, 16, 1, 1, 1L}));
        commit.commit(1L, write.prepareCommit(true, 1L));
        HashMap<String, String> customOptions = new HashMap<String, String>();
        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), "pt1=max_pt(),pt3=max_pt()");
        this.table = this.table.copy(customOptions);
        Assertions.assertThatCode(() -> PartitionLoader.of((FileStoreTable)this.table)).hasMessage("Max load level is 0, but partition field pt3 with a higher level 2 sets MAX_PT.");
        write.close();
        commit.close();
    }

    private FileStoreTable createFileStoreTable(List<String> partitionKeys, List<String> primaryKeys, Map<String, String> customOptions) throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)this.fileIO, this.tablePath);
        Options conf = new Options(customOptions);
        conf.set(CoreOptions.BUCKET, (Object)2);
        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, (Object)Duration.ofSeconds(1L));
        if (primaryKeys.isEmpty()) {
            conf.set(CoreOptions.BUCKET_KEY.key(), "k");
        }
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"pt1", "pt2", "pt3", "k", "v"});
        Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), "");
        TableSchema tableSchema = schemaManager.createTable(schema);
        return FileStoreTableFactory.create((FileIO)this.fileIO, (Path)new Path(this.tempDir.toString()), (TableSchema)tableSchema);
    }

    private List<String> partitionsToString(List<BinaryRow> partitions) {
        return partitions.stream().map(partition -> InternalRowPartitionComputer.partToSimpleString((RowType)this.table.rowType().project(this.table.partitionKeys()), (BinaryRow)partition, (String)"/", (int)200)).collect(Collectors.toList());
    }
}

