/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FileStoreLookupFunctionTest {
    private static final Random RANDOM = new Random();
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private FileStoreLookupFunction fileStoreLookupFunction;
    private FileStoreTable fileStoreTable;
    @TempDir
    private java.nio.file.Path tempDir;

    @BeforeEach
    public void before() throws Exception {
        Path path = new Path(this.tempDir.toString());
        SchemaManager schemaManager = new SchemaManager((FileIO)this.fileIO, path);
        Options conf = new Options();
        conf.set(CoreOptions.BUCKET, (Object)2);
        conf.set(CoreOptions.WRITE_BUFFER_SIZE, (Object)new MemorySize(12288L));
        conf.set(CoreOptions.PAGE_SIZE, (Object)new MemorySize(4096L));
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, (Object)3);
        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, (Object)2);
        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, (Object)Duration.ofSeconds(1L));
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"pt", "k", "v"});
        Schema schema = new Schema(rowType.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), conf.toMap(), "");
        TableSchema tableSchema = schemaManager.createTable(schema);
        this.fileStoreTable = FileStoreTableFactory.create((FileIO)this.fileIO, (Path)new Path(this.tempDir.toString()), (TableSchema)tableSchema);
        this.fileStoreLookupFunction = new FileStoreLookupFunction((Table)this.fileStoreTable, new int[]{0, 1}, new int[]{1}, null);
        this.fileStoreLookupFunction.open(this.tempDir.toString());
    }

    @AfterEach
    public void close() throws Exception {
        if (this.fileStoreLookupFunction != null) {
            this.fileStoreLookupFunction.close();
        }
    }

    @Test
    public void testLookupScanLeak() throws Exception {
        this.commit(this.writeCommit(1));
        this.fileStoreLookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
        this.commit(this.writeCommit(10));
        this.fileStoreLookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        Assertions.assertThat((int)TraceableFileIO.openInputStreams(s -> s.toString().contains(this.tempDir.toString())).size()).isEqualTo(0);
    }

    @Test
    public void testLookupExpiredSnapshot() throws Exception {
        this.commit(this.writeCommit(1));
        this.fileStoreLookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
        this.commit(this.writeCommit(2));
        this.commit(this.writeCommit(3));
        this.commit(this.writeCommit(4));
        this.commit(this.writeCommit(5));
        this.fileStoreLookupFunction.lookup((RowData)new FlinkRowData((InternalRow)GenericRow.of((Object[])new Object[]{1, 1, 10L})));
    }

    private void commit(List<CommitMessage> messages) throws Exception {
        TableCommitImpl commit = this.fileStoreTable.newCommit(this.commitUser);
        commit.commit(messages);
        commit.close();
    }

    private List<CommitMessage> writeCommit(int number) throws Exception {
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        StreamTableWrite writer = this.fileStoreTable.newStreamWriteBuilder().newWrite();
        for (int i = 0; i < number; ++i) {
            writer.write(this.randomRow());
            messages.addAll(writer.prepareCommit(true, (long)i));
        }
        return messages;
    }

    private InternalRow randomRow() {
        return GenericRow.of((Object[])new Object[]{RANDOM.nextInt(100), RANDOM.nextInt(100), RANDOM.nextLong()});
    }
}

