/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.flink.lookup.NoPrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.flink.lookup.SecondaryIndexLookupTable;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SortUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

@ExtendWith(value={ParameterizedTestExtension.class})
public class LookupTableTest
extends TableTestBase {
    private final boolean inMemory;
    @TempDir
    java.nio.file.Path tempDir;
    private RowType rowType;
    private IOManager ioManager;
    private FullCacheLookupTable table;

    public LookupTableTest(boolean inMemory) {
        this.inMemory = inMemory;
    }

    @Parameters(name="{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    public void before() throws IOException {
        this.rowType = RowType.of((DataType[])new DataType[]{new IntType(), new IntType(), new IntType()});
        this.ioManager = new IOManagerImpl(new String[]{this.tempDir.toString()});
    }

    @AfterEach
    public void after() throws IOException {
        if (this.table != null) {
            this.table.close();
        }
    }

    private FileStoreTable createTable(List<String> primaryKeys, Options options) throws Exception {
        if (this.inMemory) {
            options.set(FlinkConnectorOptions.LOOKUP_CACHE_MODE, (Object)FlinkConnectorOptions.LookupCacheMode.MEMORY);
        }
        Identifier identifier = new Identifier("default", "t");
        Schema schema = new Schema(this.rowType.getFields(), Collections.emptyList(), primaryKeys, options.toMap(), null);
        this.catalog.createTable(identifier, schema, false);
        return (FileStoreTable)this.catalog.getTable(identifier);
    }

    @TestTemplate
    public void testPkTable() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        if (!this.inMemory) {
            FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
            bulkLoader.write(new byte[]{1}, new byte[]{1});
            Assertions.assertThatThrownBy(() -> bulkLoader.write(new byte[]{1}, new byte[]{2})).hasMessageContaining("Keys must be added in strict ascending order");
        }
        ArrayList<Pair> records = new ArrayList<Pair>();
        for (int i = 1; i <= 100000; ++i) {
            InternalRow row = LookupTableTest.row(i, 11 * i, 111 * i);
            records.add(Pair.of((Object)this.table.toKeyBytes(row), (Object)this.table.toValueBytes(row)));
        }
        records.sort((o1, o2) -> SortUtil.compareBinary((byte[])((byte[])o1.getKey()), (byte[])((byte[])o2.getKey())));
        FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
        for (Pair kv : records) {
            bulkLoader.write((byte[])kv.getKey(), (byte[])kv.getValue());
        }
        bulkLoader.finish();
        for (int i = 1; i <= 100000; ++i) {
            List result = this.table.get(LookupTableTest.row(i));
            Assertions.assertThat((List)result).hasSize(1);
            LookupTableTest.assertRow((InternalRow)result.get(0), i, 11 * i, 111 * i);
        }
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(RowKind.DELETE, new Object[]{1, 11, 111})).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(1))).hasSize(0);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(RowKind.DELETE, new Object[]{3, 33, 333})).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(3))).hasSize(0);
    }

    @TestTemplate
    public void testPkTableWithSequenceField() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.SEQUENCE_FIELD, (Object)"f1");
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        ArrayList<Pair> records = new ArrayList<Pair>();
        for (int i = 1; i <= 10; ++i) {
            InternalRow row = LookupTableTest.row(i, 11 * i, 111 * i);
            records.add(Pair.of((Object)this.table.toKeyBytes(row), (Object)this.table.toValueBytes(row)));
        }
        records.sort((o1, o2) -> SortUtil.compareBinary((byte[])((byte[])o1.getKey()), (byte[])((byte[])o2.getKey())));
        FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
        for (Pair kv : records) {
            bulkLoader.write((byte[])kv.getKey(), (byte[])kv.getValue());
        }
        bulkLoader.finish();
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 333)).iterator());
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(RowKind.DELETE, new Object[]{1, 11, 111})).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(1))).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
    }

    @TestTemplate
    public void testPkTableWithSequenceFieldProjection() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.SEQUENCE_FIELD, (Object)"f2");
        options.set(CoreOptions.BUCKET, (Object)1);
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.write((Table)storeTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111})});
        this.table.refresh();
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11);
        this.write((Table)storeTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 22, 222})});
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22);
        this.write((Table)storeTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 33, 111})});
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22);
    }

    @TestTemplate
    public void testPkTablePkFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, new PredicateBuilder(RowType.of((DataType[])new DataType[]{DataTypes.INT()})).lessThan(0, (Object)3), this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 111)).iterator());
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(RowKind.DELETE, new Object[]{1, 11, 111})).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(1))).hasSize(0);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(3, 33, 333)).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(3))).hasSize(0);
    }

    @TestTemplate
    public void testPkTableNonPkFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, new PredicateBuilder(RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()})).lessThan(1, (Object)22), this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 111)).iterator());
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(0);
    }

    @TestTemplate
    public void testSecKeyTable() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        ArrayList<Pair> records = new ArrayList<Pair>();
        Random rnd = new Random();
        HashMap<Integer, Set> secKeyToPk = new HashMap<Integer, Set>();
        for (int i = 1; i <= 100000; ++i) {
            int secKey = rnd.nextInt(i);
            InternalRow internalRow = LookupTableTest.row(i, secKey, 111 * i);
            records.add(Pair.of((Object)this.table.toKeyBytes(internalRow), (Object)this.table.toValueBytes(internalRow)));
            secKeyToPk.computeIfAbsent(secKey, k -> new HashSet()).add(i);
        }
        records.sort((o1, o2) -> SortUtil.compareBinary((byte[])((byte[])o1.getKey()), (byte[])((byte[])o2.getKey())));
        FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
        for (Pair pair : records) {
            bulkLoader.write((byte[])pair.getKey(), (byte[])pair.getValue());
        }
        bulkLoader.finish();
        for (Map.Entry entry : secKeyToPk.entrySet()) {
            List result = this.table.get(LookupTableTest.row(entry.getKey()));
            Assertions.assertThat(result.stream().map(row -> row.getInt(0))).containsExactlyInAnyOrderElementsOf((Iterable)entry.getValue());
        }
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        List result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat(result.stream().map(row -> row.getInt(0))).contains((Object[])new Integer[]{1});
    }

    @TestTemplate
    public void testSecKeyTableWithSequenceField() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.SEQUENCE_FIELD, (Object)"f1");
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        ArrayList<Pair> records = new ArrayList<Pair>();
        Random rnd = new Random();
        HashMap<Integer, Set> secKeyToPk = new HashMap<Integer, Set>();
        for (int i = 1; i <= 10; ++i) {
            int secKey = rnd.nextInt(i);
            JoinedRow joinedRow = new JoinedRow(LookupTableTest.row(i, secKey, 111 * i), (InternalRow)GenericRow.of((Object[])new Object[]{-1L}));
            records.add(Pair.of((Object)this.table.toKeyBytes((InternalRow)joinedRow), (Object)this.table.toValueBytes((InternalRow)joinedRow)));
            secKeyToPk.computeIfAbsent(secKey, k -> new HashSet()).add(i);
        }
        records.sort((o1, o2) -> SortUtil.compareBinary((byte[])((byte[])o1.getKey()), (byte[])((byte[])o2.getKey())));
        FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
        for (Pair pair : records) {
            bulkLoader.write((byte[])pair.getKey(), (byte[])pair.getValue());
        }
        bulkLoader.finish();
        for (Map.Entry entry : secKeyToPk.entrySet()) {
            List result = this.table.get(LookupTableTest.row(entry.getKey()));
            Assertions.assertThat(result.stream().map(row -> row.getInt(0))).containsExactlyInAnyOrderElementsOf((Iterable)entry.getValue());
        }
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        List result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat(result.stream().map(row -> row.getInt(0))).contains((Object[])new Integer[]{1});
        Assertions.assertThat(result.stream().map(InternalRow::getFieldCount)).allMatch(n -> n == 3);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 333)).iterator());
        result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat(result.stream().map(row -> row.getInt(2))).doesNotContain((Object[])new Integer[]{333});
    }

    @TestTemplate
    public void testSecKeyTablePkFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, new PredicateBuilder(RowType.of((DataType[])new DataType[]{DataTypes.INT()})).lessThan(0, (Object)3), this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 111)).iterator());
        List result = this.table.get(LookupTableTest.row(11));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 222)).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(11))).hasSize(0);
        result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(2, 22, 222)).iterator());
        result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat((List)result).hasSize(2);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        LookupTableTest.assertRow((InternalRow)result.get(1), 2, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(RowKind.DELETE, new Object[]{2, 22, 222})).iterator());
        result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(3, 33, 333)).iterator());
        Assertions.assertThat((List)this.table.get(LookupTableTest.row(33))).hasSize(0);
    }

    @TestTemplate
    public void testNoPrimaryKeyTable() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.emptyList(), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        ArrayList<Pair> records = new ArrayList<Pair>();
        Random rnd = new Random();
        HashMap<Integer, List> joinKeyToFirst = new HashMap<Integer, List>();
        for (int i = 1; i <= 100000; ++i) {
            int joinKey = rnd.nextInt(i);
            InternalRow internalRow = LookupTableTest.row(i, joinKey, 111 * i);
            records.add(Pair.of((Object)this.table.toKeyBytes(internalRow), (Object)this.table.toValueBytes(internalRow)));
            joinKeyToFirst.computeIfAbsent(joinKey, k -> new ArrayList()).add(i);
        }
        records.sort((o1, o2) -> SortUtil.compareBinary((byte[])((byte[])o1.getKey()), (byte[])((byte[])o2.getKey())));
        FullCacheLookupTable.TableBulkLoader bulkLoader = this.table.createBulkLoader();
        for (Pair pair : records) {
            bulkLoader.write((byte[])pair.getKey(), (byte[])pair.getValue());
        }
        bulkLoader.finish();
        for (Map.Entry entry : joinKeyToFirst.entrySet()) {
            List result = this.table.get(LookupTableTest.row(entry.getKey()));
            Assertions.assertThat(result.stream().map(row -> row.getInt(0))).containsExactlyInAnyOrderElementsOf((Iterable)entry.getValue());
        }
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 22, 333)).iterator());
        List result = this.table.get(LookupTableTest.row(22));
        Assertions.assertThat(result.stream().map(row -> row.getInt(0))).contains((Object[])new Integer[]{1});
    }

    @TestTemplate
    public void testNoPrimaryKeyTableFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.emptyList(), new Options());
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, new PredicateBuilder(RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()})).lessThan(2, (Object)222), this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        this.table.close();
        this.table.open();
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 333)).iterator());
        List result = this.table.get(LookupTableTest.row(11));
        Assertions.assertThat((List)result).hasSize(0);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 111)).iterator());
        result = this.table.get(LookupTableTest.row(11));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.table.refresh(Collections.singletonList(LookupTableTest.row(1, 11, 111)).iterator());
        result = this.table.get(LookupTableTest.row(11));
        Assertions.assertThat((List)result).hasSize(2);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        LookupTableTest.assertRow((InternalRow)result.get(1), 1, 11, 111);
    }

    @TestTemplate
    public void testPkTableWithCacheRowFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111}), GenericRow.of((Object[])new Object[]{2, 22, 222})});
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        Assertions.assertThat((Object)this.table).isInstanceOf(PrimaryKeyLookupTable.class);
        this.table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
        this.table.open();
        List res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{1}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 1, 11, 111);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{2}));
        Assertions.assertThat((List)res).isEmpty();
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{0, 0, 0}), GenericRow.of((Object[])new Object[]{3, 33, 333})});
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).isEmpty();
        this.table.refresh();
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 0, 0, 0);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{3}));
        Assertions.assertThat((List)res).isEmpty();
    }

    @TestTemplate
    public void testRefreshExecutorRebuildAfterReopen() throws Exception {
        Options options = new Options();
        options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, (Object)true);
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111}), GenericRow.of((Object[])new Object[]{2, 22, 222})});
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        Assertions.assertThat((Object)this.table).isInstanceOf(PrimaryKeyLookupTable.class);
        this.table.open();
        this.table.close();
        this.table.open();
        List res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{1}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 1, 11, 111);
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 22, 222})});
        this.table.refresh();
        Assertions.assertThat((Future)this.table.getRefreshFuture()).isNotNull();
        this.table.getRefreshFuture().get();
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{1}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 1, 22, 222);
    }

    @TestTemplate
    public void testNoPkTableWithCacheRowFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.emptyList(), new Options());
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111}), GenericRow.of((Object[])new Object[]{2, 22, 222})});
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        Assertions.assertThat((Object)this.table).isInstanceOf(NoPrimaryKeyLookupTable.class);
        this.table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
        this.table.open();
        List res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{1}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 1, 11, 111);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{2}));
        Assertions.assertThat((List)res).isEmpty();
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{0, 0, 0}), GenericRow.of((Object[])new Object[]{3, 33, 333})});
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).isEmpty();
        this.table.refresh();
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 0, 0, 0);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{3}));
        Assertions.assertThat((List)res).isEmpty();
    }

    @TestTemplate
    public void testSecKeyTableWithCacheRowFilter() throws Exception {
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), new Options());
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111}), GenericRow.of((Object[])new Object[]{2, 22, 222})});
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f1"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        Assertions.assertThat((Object)this.table).isInstanceOf(SecondaryIndexLookupTable.class);
        this.table.specifyCacheRowFilter(row -> row.getInt(1) < 22);
        this.table.open();
        List res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{11}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 1, 11, 111);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{22}));
        Assertions.assertThat((List)res).isEmpty();
        this.writeWithBucketAssigner((Table)storeTable, row -> 0, new InternalRow[]{GenericRow.of((Object[])new Object[]{0, 0, 0}), GenericRow.of((Object[])new Object[]{3, 33, 333})});
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).isEmpty();
        this.table.refresh();
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{0}));
        Assertions.assertThat((List)res).hasSize(1);
        LookupTableTest.assertRow((InternalRow)res.get(0), 0, 0, 0);
        res = this.table.get((InternalRow)GenericRow.of((Object[])new Object[]{33}));
        Assertions.assertThat((List)res).isEmpty();
    }

    @TestTemplate
    public void testPartialLookupTable() throws Exception {
        FileStoreTable dimTable = this.createDimTable();
        PrimaryKeyPartialLookupTable table = PrimaryKeyPartialLookupTable.createLocalTable((FileStoreTable)dimTable, (int[])new int[]{0, 1, 2}, (File)this.tempDir.toFile(), (List)ImmutableList.of((Object)"pk1", (Object)"pk2"), null);
        table.open();
        List result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, -1, 11}), GenericRow.of((Object[])new Object[]{2, -2, 22})});
        result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
        table.refresh();
        result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, -1, 11);
        result = table.get(LookupTableTest.row(2, -2));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 2, -2, 22);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, -1, 11})});
        table.refresh();
        result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
    }

    @TestTemplate
    public void testPartialLookupTableWithRowFilter() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.BUCKET.key(), "2");
        options.set(CoreOptions.BUCKET_KEY.key(), "f0");
        FileStoreTable dimTable = this.createTable(Collections.singletonList("f0"), options);
        this.write((Table)dimTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111}), GenericRow.of((Object[])new Object[]{2, 22, 222})});
        PrimaryKeyPartialLookupTable table = PrimaryKeyPartialLookupTable.createLocalTable((FileStoreTable)dimTable, (int[])new int[]{0, 2}, (File)this.tempDir.toFile(), (List)ImmutableList.of((Object)"f0"), null);
        table.specifyCacheRowFilter(row -> row.getInt(0) < 2);
        table.open();
        List result = table.get(LookupTableTest.row(1, 11));
        Assertions.assertThat((List)result).hasSize(1);
        result = table.get(LookupTableTest.row(2, 22));
        Assertions.assertThat((List)result).isEmpty();
    }

    @TestTemplate
    public void testPartialLookupTableWithProjection() throws Exception {
        FileStoreTable dimTable = this.createDimTable();
        PrimaryKeyPartialLookupTable table = PrimaryKeyPartialLookupTable.createLocalTable((FileStoreTable)dimTable, (int[])new int[]{2, 1}, (File)this.tempDir.toFile(), (List)ImmutableList.of((Object)"pk1", (Object)"pk2"), null);
        table.open();
        table.close();
        table.open();
        List result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, -1, 11}), GenericRow.of((Object[])new Object[]{2, -2, 22})});
        result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
        table.refresh();
        result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 11, -1);
        result = table.get(LookupTableTest.row(2, -2));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 22, -2);
    }

    @TestTemplate
    public void testPartialLookupTableJoinKeyOrder() throws Exception {
        FileStoreTable dimTable = this.createDimTable();
        PrimaryKeyPartialLookupTable table = PrimaryKeyPartialLookupTable.createLocalTable((FileStoreTable)dimTable, (int[])new int[]{2, 1}, (File)this.tempDir.toFile(), (List)ImmutableList.of((Object)"pk2", (Object)"pk1"), null);
        table.open();
        table.close();
        table.open();
        List result = table.get(LookupTableTest.row(-1, 1));
        Assertions.assertThat((List)result).hasSize(0);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, -1, 11}), GenericRow.of((Object[])new Object[]{2, -2, 22})});
        result = table.get(LookupTableTest.row(-1, 1));
        Assertions.assertThat((List)result).hasSize(0);
        table.refresh();
        result = table.get(LookupTableTest.row(-1, 1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 11, -1);
        result = table.get(LookupTableTest.row(-2, 2));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 22, -2);
    }

    @TestTemplate
    public void testPKLookupTableNotRefreshAsync() throws Exception {
        this.innerTestPKLookupTableRefreshAsync(false);
    }

    @TestTemplate
    public void testPKLookupTableRefreshAsync() throws Exception {
        this.innerTestPKLookupTableRefreshAsync(true);
    }

    private void innerTestPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception {
        Options options = new Options();
        options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, (Object)refreshAsync);
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.table.open();
        BatchWriteBuilder writeBuilder = storeTable.newBatchWriteBuilder();
        HashSet<Integer> insertKeys = new HashSet<Integer>();
        try (BatchTableWrite write = writeBuilder.newWrite();){
            for (int i = 1; i <= 100000; ++i) {
                insertKeys.add(i);
                write.write(LookupTableTest.row(i, 11 * i, 111 * i), 0);
            }
            try (BatchTableCommit commit = writeBuilder.newCommit();){
                commit.commit(write.prepareCommit());
            }
        }
        this.table.refresh();
        HashSet<Integer> batchKeys = new HashSet<Integer>();
        long start = System.currentTimeMillis();
        while (batchKeys.size() < 100000) {
            Thread.sleep(10L);
            for (int i = 1; i <= 100000; ++i) {
                List result = this.table.get(LookupTableTest.row(i));
                if (result.isEmpty()) continue;
                Assertions.assertThat((List)result).hasSize(1);
                LookupTableTest.assertRow((InternalRow)result.get(0), i, 11 * i, 111 * i);
                batchKeys.add(i);
            }
            if (System.currentTimeMillis() - start <= 30000L) continue;
            throw new TimeoutException();
        }
        Assertions.assertThat(batchKeys).isEqualTo(insertKeys);
        for (int k = 0; k < 10; ++k) {
            try (BatchTableWrite write = writeBuilder.newWrite();){
                for (int i = 1; i <= 100; ++i) {
                    write.write(LookupTableTest.row(i, 11 * i, 111 * i), 0);
                }
                try (BatchTableCommit commit = writeBuilder.newCommit();){
                    commit.commit(write.prepareCommit());
                    continue;
                }
            }
        }
        this.table.refresh();
        this.table.close();
    }

    @TestTemplate
    public void testFullCacheLookupTableWithForceLookup() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.MERGE_ENGINE, (Object)CoreOptions.MergeEngine.PARTIAL_UPDATE);
        options.set(FlinkConnectorOptions.LOOKUP_CACHE_MODE, (Object)(this.inMemory ? FlinkConnectorOptions.LookupCacheMode.MEMORY : FlinkConnectorOptions.LookupCacheMode.FULL));
        options.set(CoreOptions.WRITE_ONLY, (Object)true);
        options.set(CoreOptions.FORCE_LOOKUP, (Object)true);
        options.set(CoreOptions.BUCKET, (Object)1);
        FileStoreTable storeTable = this.createTable(Collections.singletonList("f0"), options);
        FileStoreTable compactTable = storeTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
        FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, new int[]{0, 1, 2}, null, null, this.tempDir.toFile(), Collections.singletonList("f0"), null);
        this.table = FullCacheLookupTable.create((FullCacheLookupTable.Context)context, (long)(ThreadLocalRandom.current().nextInt(2) * 10));
        this.write((Table)storeTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 11, 111})});
        this.compact((Table)compactTable, BinaryRow.EMPTY_ROW, 0, this.ioManager, true);
        this.table.open();
        List result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.write((Table)storeTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, null, 222})});
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 111);
        this.compact((Table)compactTable, BinaryRow.EMPTY_ROW, 0, this.ioManager, false);
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 222);
        this.write((Table)storeTable, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, 22, null})});
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 11, 222);
        this.compact((Table)compactTable, BinaryRow.EMPTY_ROW, 0, this.ioManager, true);
        this.table.refresh();
        result = this.table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, 22, 222);
    }

    @TestTemplate
    public void testPartialLookupTableWithForceLookup() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.MERGE_ENGINE, (Object)CoreOptions.MergeEngine.PARTIAL_UPDATE);
        options.set(CoreOptions.CHANGELOG_PRODUCER, (Object)CoreOptions.ChangelogProducer.NONE);
        options.set(CoreOptions.FORCE_LOOKUP, (Object)true);
        options.set(CoreOptions.BUCKET, (Object)1);
        FileStoreTable dimTable = this.createTable(Collections.singletonList("f0"), options);
        PrimaryKeyPartialLookupTable table = PrimaryKeyPartialLookupTable.createLocalTable((FileStoreTable)dimTable, (int[])new int[]{0, 1, 2}, (File)this.tempDir.toFile(), (List)ImmutableList.of((Object)"f0"), null);
        table.open();
        List result = table.get(LookupTableTest.row(1, -1));
        Assertions.assertThat((List)result).hasSize(0);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, -1, 11}), GenericRow.of((Object[])new Object[]{2, -2, 22})});
        result = table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(0);
        table.refresh();
        result = table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, -1, 11);
        result = table.get(LookupTableTest.row(2));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 2, -2, 22);
        this.write((Table)dimTable, this.ioManager, new InternalRow[]{GenericRow.of((Object[])new Object[]{1, null, 111})});
        table.refresh();
        result = table.get(LookupTableTest.row(1));
        Assertions.assertThat((List)result).hasSize(1);
        LookupTableTest.assertRow((InternalRow)result.get(0), 1, -1, 111);
    }

    private FileStoreTable createDimTable() throws Exception {
        LocalFileIO fileIO = LocalFileIO.create();
        Path tablePath = new Path(String.format("%s/%s.db/%s", this.warehouse, this.database, "T"));
        Schema schema = Schema.newBuilder().column("pk1", (DataType)DataTypes.INT()).column("pk2", (DataType)DataTypes.INT()).column("col2", (DataType)DataTypes.INT()).primaryKey(new String[]{"pk1", "pk2"}).option(CoreOptions.BUCKET.key(), "2").option(CoreOptions.BUCKET_KEY.key(), "pk2").build();
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)fileIO, tablePath), (Schema)schema);
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)tablePath, (TableSchema)tableSchema);
    }

    private static InternalRow row(Object ... values) {
        return LookupTableTest.row(RowKind.INSERT, values);
    }

    private static InternalRow row(RowKind kind, Object ... values) {
        GenericRow row = new GenericRow(kind, values.length);
        for (int i = 0; i < values.length; ++i) {
            row.setField(i, values[i]);
        }
        return row;
    }

    private static void assertRow(InternalRow resultRow, int ... expected) {
        int[] results = new int[expected.length];
        for (int i = 0; i < results.length; ++i) {
            results[i] = resultRow.getInt(i);
        }
        Assertions.assertThat((int[])results).containsExactly(expected);
        Assertions.assertThat((int)resultRow.getFieldCount()).isEqualTo(expected.length);
    }
}

