/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class FileStoreLookupFunctionTest {
    private static final Random RANDOM = new Random();
    @TempDir
    private java.nio.file.Path tempDir;
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private Path tablePath;
    private FileStoreLookupFunction lookupFunction;
    private FileStoreTable table;

    @BeforeEach
    public void before() throws Exception {
        this.tablePath = new Path(this.tempDir.toString());
    }

    private void createLookupFunction(boolean refreshAsync) throws Exception {
        this.createLookupFunction(true, false, false, refreshAsync);
    }

    private void createLookupFunction(boolean isPartition, boolean joinEqualPk, boolean dynamicPartition, boolean refreshAsync) throws Exception {
        this.table = this.createFileStoreTable(isPartition, dynamicPartition, refreshAsync);
        this.lookupFunction = this.createLookupFunction(this.table, joinEqualPk);
        this.lookupFunction.open(this.tempDir.toString());
    }

    private FileStoreLookupFunction createLookupFunction(FileStoreTable table, boolean joinEqualPk) {
        int[] nArray;
        int[] nArray2 = new int[]{0, 1};
        if (joinEqualPk) {
            int[] nArray3 = new int[2];
            nArray3[0] = 0;
            nArray = nArray3;
            nArray3[1] = 1;
        } else {
            int[] nArray4 = new int[1];
            nArray = nArray4;
            nArray4[0] = 1;
        }
        return new FileStoreLookupFunction(table, nArray2, nArray, null, null);
    }

    private FileStoreTable createFileStoreTable(boolean isPartition, boolean dynamicPartition, boolean refreshAsync) throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)this.fileIO, this.tablePath);
        Options conf = new Options();
        conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, (Object)refreshAsync);
        conf.set(CoreOptions.BUCKET, (Object)2);
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, (Object)3);
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, (Object)2);
        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, (Object)Duration.ofSeconds(1L));
        if (dynamicPartition) {
            conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, (Object)"max_pt()");
        }
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"pt", "k", "v"});
        Schema schema = new Schema(rowType.getFields(), isPartition ? Collections.singletonList("pt") : Collections.emptyList(), Arrays.asList("pt", "k"), conf.toMap(), "");
        TableSchema tableSchema = schemaManager.createTable(schema);
        return FileStoreTableFactory.create((FileIO)this.fileIO, (Path)new Path(this.tempDir.toString()), (TableSchema)tableSchema);
    }

    @AfterEach
    public void close() throws Exception {
        if (this.lookupFunction != null) {
            this.lookupFunction.close();
        }
    }

    @Test
    public void testCompatibilityForOldVersion() throws Exception {
        this.createLookupFunction(false, true, false, false);
        this.commit(this.writeCommit(1));
        PrimaryKeyPartialLookupTable lookupTable = (PrimaryKeyPartialLookupTable)this.lookupFunction.lookupTable();
        PrimaryKeyPartialLookupTable.LocalQueryExecutor queryExecutor = (PrimaryKeyPartialLookupTable.LocalQueryExecutor)lookupTable.queryExecutor();
        DataSplit split = (DataSplit)this.table.newReadBuilder().newScan().plan().splits().get(0);
        Field field = DataSplit.class.getDeclaredField("totalBuckets");
        field.setAccessible(true);
        field.set(split, null);
        Assertions.assertThat((Integer)split.totalBuckets()).isNull();
        queryExecutor.refreshSplit(split);
        Assertions.assertThat((Integer)queryExecutor.numBuckets(BinaryRow.EMPTY_ROW)).isEqualTo(2);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {
        this.createLookupFunction(false, true, false, refreshAsync);
        Assertions.assertThat((Object)this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        PrimaryKeyPartialLookupTable.QueryExecutor queryExecutor = ((PrimaryKeyPartialLookupTable)this.lookupFunction.lookupTable()).queryExecutor();
        Assertions.assertThat((Object)queryExecutor).isInstanceOf(PrimaryKeyPartialLookupTable.LocalQueryExecutor.class);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testDefaultRemotePartial(boolean refreshAsync) throws Exception {
        this.createLookupFunction(false, true, false, refreshAsync);
        ServiceManager serviceManager = new ServiceManager((FileIO)this.fileIO, this.tablePath);
        serviceManager.resetService("primary-key-lookup", new InetSocketAddress[]{new InetSocketAddress(1)});
        this.lookupFunction.open(this.tempDir.toString());
        Assertions.assertThat((Object)this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        PrimaryKeyPartialLookupTable.QueryExecutor queryExecutor = ((PrimaryKeyPartialLookupTable)this.lookupFunction.lookupTable()).queryExecutor();
        Assertions.assertThat((Object)queryExecutor).isInstanceOf(PrimaryKeyPartialLookupTable.RemoteQueryExecutor.class);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testLookupScanLeak(boolean refreshAsync) throws Exception {
        this.createLookupFunction(refreshAsync);
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
        this.commit(this.writeCommit(10));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testLookupExpiredSnapshot(boolean refreshAsync) throws Exception {
        this.createLookupFunction(refreshAsync);
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        this.commit(this.writeCommit(2));
        this.commit(this.writeCommit(3));
        this.commit(this.writeCommit(4));
        this.commit(this.writeCommit(5));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
    }

    @Test
    public void testLookupDynamicPartition() throws Exception {
        this.createLookupFunction(true, false, true, false);
        this.commit(this.writeCommit(1));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
        this.commit(this.writeCommit(10));
        this.lookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
    }

    @Test
    public void testParseWrongTimePeriodsBlacklist() throws Exception {
        FileStoreTable table = this.createFileStoreTable(false, false, false);
        FileStoreTable table1 = table.copy(Collections.singletonMap(FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(), "2024-10-31 12:00,2024-10-31 16:00"));
        AssertionsForClassTypes.assertThatThrownBy(() -> this.createLookupFunction(table1, true)).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Incorrect time periods format: [2024-10-31 12:00,2024-10-31 16:00].")});
        FileStoreTable table2 = table.copy(Collections.singletonMap(FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(), "20241031 12:00->20241031 16:00"));
        AssertionsForClassTypes.assertThatThrownBy(() -> this.createLookupFunction(table2, true)).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Date time format error: [20241031 12:00]")});
        FileStoreTable table3 = table.copy(Collections.singletonMap(FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(), "2024-10-31 12:00->2024-10-31 16:00,2024-10-31 20:00->2024-10-31 18:00"));
        AssertionsForClassTypes.assertThatThrownBy(() -> this.createLookupFunction(table3, true)).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Incorrect time period: [2024-10-31 20:00->2024-10-31 18:00]")});
    }

    @Test
    public void testCheckRefreshInBlacklist() throws Exception {
        Instant now = Instant.now();
        Instant start = Instant.ofEpochSecond(now.getEpochSecond() / 60L * 60L);
        Instant end = start.plusSeconds(1800L);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
        String left = start.atZone(ZoneId.systemDefault()).format(formatter);
        String right = end.atZone(ZoneId.systemDefault()).format(formatter);
        FileStoreTable table = this.createFileStoreTable(false, false, false).copy(Collections.singletonMap(FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(), left + "->" + right));
        FileStoreLookupFunction lookupFunction = this.createLookupFunction(table, true);
        lookupFunction.tryRefresh();
        Assertions.assertThat((long)lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli() + 1L);
    }

    private void commit(List<CommitMessage> messages) throws Exception {
        TableCommitImpl commit = this.table.newCommit(this.commitUser);
        commit.commit(messages);
        commit.close();
    }

    private List<CommitMessage> writeCommit(int number) throws Exception {
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        StreamTableWrite writer = this.table.newStreamWriteBuilder().newWrite();
        for (int i = 0; i < number; ++i) {
            writer.write(this.randomRow());
            messages.addAll(writer.prepareCommit(true, (long)i));
        }
        return messages;
    }

    private InternalRow randomRow() {
        return GenericRow.of((Object[])new Object[]{RANDOM.nextInt(100), RANDOM.nextInt(100), RANDOM.nextLong()});
    }
}

