/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class BucketIdExtractorTest
extends CatalogITCaseBase {
    private static final int BUCKET_NUMBER = 5;
    private static final int COL_NUMBER = 5;
    private static final int ROW_NUMBER = 100;

    @Test
    public void testJoinKeyEqualToSingleBucketKey() throws Exception {
        Random seed = new Random();
        int bucketId = seed.nextInt(5);
        int bucketKeyIndex = seed.nextInt(5);
        String bucketKeyName = "col" + (bucketKeyIndex + 1);
        FileStoreTable table = this.createTestTable(bucketKeyName);
        List<List<Object>> bucketKeyRows = this.getGroundTruthBucketKeyRows(table, bucketId, this.createBucketKeyGetter(Collections.singletonList(bucketKeyIndex)));
        List<String> joinKeyNames = Collections.singletonList(bucketKeyName);
        List<RowData> joinKeyRows = this.generateJoinKeyRows(bucketKeyRows, bucketKeyRow -> GenericRowData.of((Object[])new Object[]{bucketKeyRow.get(0)}));
        BucketIdExtractor bucketIdExtractor = new BucketIdExtractor(5, table.schema(), joinKeyNames, joinKeyNames);
        this.checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
    }

    @Test
    public void testJoinKeysContainSingleKey() throws Exception {
        Random seed = new Random();
        List<List<Integer>> joinKeyIndexes = this.getColumnIndexCombinations();
        int bucketId = seed.nextInt(5);
        List<Integer> joinKeyIndex = joinKeyIndexes.get(seed.nextInt(joinKeyIndexes.size()));
        int bucketKeyIndex = joinKeyIndex.get(1);
        String bucketKeyName = "col" + bucketKeyIndex;
        FileStoreTable table = this.createTestTable(bucketKeyName);
        List<List<Object>> bucketKeyRows = this.getGroundTruthBucketKeyRows(table, bucketId, this.createBucketKeyGetter(Collections.singletonList(bucketKeyIndex - 1)));
        List<String> bucketKeyNames = Collections.singletonList(bucketKeyName);
        List<String> joinKeyNames = Arrays.asList("col" + joinKeyIndex.get(0), bucketKeyName);
        List<RowData> joinKeyRows = this.generateJoinKeyRows(bucketKeyRows, bucketKeyRow -> GenericRowData.of((Object[])new Object[]{this.generateFakeColumnValue((Integer)joinKeyIndex.get(0)), bucketKeyRow.get(0)}));
        BucketIdExtractor bucketIdExtractor = new BucketIdExtractor(5, table.schema(), joinKeyNames, bucketKeyNames);
        this.checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
    }

    @Test
    public void testJoinKeysEqualToMultiBucketKeys() throws Exception {
        Random seed = new Random();
        List<List<Integer>> bucketKeyIndexes = this.getColumnIndexCombinations();
        int bucketId = seed.nextInt(5);
        List<Integer> bucketKeyIndex = bucketKeyIndexes.get(seed.nextInt(bucketKeyIndexes.size()));
        String bucketKeyName = "col" + bucketKeyIndex.get(0) + ",col" + bucketKeyIndex.get(1);
        FileStoreTable table = this.createTestTable(bucketKeyName);
        List<List<Object>> bucketKeyRows = this.getGroundTruthBucketKeyRows(table, bucketId, this.createBucketKeyGetter(Arrays.asList(bucketKeyIndex.get(0) - 1, bucketKeyIndex.get(1) - 1)));
        List<String> joinKeyNames = Arrays.asList(bucketKeyName.split(","));
        List<RowData> joinKeyRows = this.generateJoinKeyRows(bucketKeyRows, bucketKeyRow -> GenericRowData.of((Object[])new Object[]{bucketKeyRow.get(0), bucketKeyRow.get(1)}));
        BucketIdExtractor bucketIdExtractor = new BucketIdExtractor(5, table.schema(), joinKeyNames, joinKeyNames);
        this.checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
    }

    private List<RowData> generateJoinKeyRows(List<List<Object>> bucketKeyRows, Function<List<Object>, RowData> converter) {
        ArrayList<RowData> joinKeyRows = new ArrayList<RowData>();
        for (List<Object> bucketKeyRow : bucketKeyRows) {
            joinKeyRows.add(converter.apply(bucketKeyRow));
        }
        return joinKeyRows;
    }

    private void checkCorrectness(BucketIdExtractor extractor, int targetBucketId, List<RowData> joinKeyRows) {
        for (RowData joinKeyRow : joinKeyRows) {
            Assertions.assertThat((int)extractor.extractBucketId(joinKeyRow)).isEqualTo(targetBucketId);
        }
    }

    private List<List<Object>> getGroundTruthBucketKeyRows(FileStoreTable table, int bucketId, Function<InternalRow, List<Object>> bucketKeyGetter) throws IOException {
        List files = table.store().newScan().withBucket(bucketId).plan().files();
        ArrayList<List<Object>> bucketKeyRows = new ArrayList<List<Object>>();
        for (ManifestEntry file : files) {
            DataSplit dataSplit = DataSplit.builder().withPartition(file.partition()).withBucket(file.bucket()).withDataFiles(Collections.singletonList(file.file())).withBucketPath("not used").build();
            table.newReadBuilder().newRead().createReader((Split)dataSplit).forEachRemaining(internalRow -> bucketKeyRows.add((List<Object>)bucketKeyGetter.apply((InternalRow)internalRow)));
        }
        return bucketKeyRows;
    }

    private FileStoreTable createTestTable(String bucketKey) throws Exception {
        String tableName = "Test";
        String ddl = String.format("CREATE TABLE %s (col1 INT, col2 STRING, col3 FLOAT, col4 INT, col5 BOOLEAN ) WITH ('bucket'='%s'", tableName, 5);
        if (bucketKey != null) {
            ddl = ddl + ", 'bucket-key' = '" + bucketKey + "')";
        }
        this.batchSql(ddl, new Object[0]);
        Random seed = new Random();
        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s VALUES ", tableName));
        for (int index = 1; index < 100; ++index) {
            dml.append(String.format("(%s, '%s', %s, %s, %s), ", seed.nextInt(100), seed.nextInt(100), Float.valueOf(101.1f + (float)seed.nextInt(50)), seed.nextInt(100), seed.nextBoolean() ? "true" : "false"));
        }
        dml.append(String.format("(%s, '%s', %s, %s, %s)", seed.nextInt(100), seed.nextInt(100), Float.valueOf(101.1f + (float)seed.nextInt(50)), seed.nextInt(100), seed.nextBoolean() ? "true" : "false"));
        this.batchSql(dml.toString(), new Object[0]);
        return this.paimonTable(tableName);
    }

    private Function<InternalRow, List<Object>> createBucketKeyGetter(List<Integer> bucketKeyIndexes) {
        return row -> {
            ArrayList<Serializable> bucketKeys = new ArrayList<Serializable>();
            block7: for (Integer bucketKeyIndex : bucketKeyIndexes) {
                switch (bucketKeyIndex) {
                    case 0: {
                        bucketKeys.add(Integer.valueOf(row.getInt(0)));
                        continue block7;
                    }
                    case 1: {
                        bucketKeys.add((Serializable)StringData.fromString((String)row.getString(1).toString()));
                        continue block7;
                    }
                    case 2: {
                        bucketKeys.add(Float.valueOf(row.getFloat(2)));
                        continue block7;
                    }
                    case 3: {
                        bucketKeys.add(Integer.valueOf(row.getInt(3)));
                        continue block7;
                    }
                    case 4: {
                        bucketKeys.add(Boolean.valueOf(row.getBoolean(4)));
                        continue block7;
                    }
                }
                throw new UnsupportedOperationException();
            }
            return bucketKeys;
        };
    }

    private List<List<Integer>> getColumnIndexCombinations() {
        ArrayList<List<Integer>> bucketIndexes = new ArrayList<List<Integer>>();
        for (int i = 1; i <= 5; ++i) {
            for (int j = i + 1; j <= 5; ++j) {
                bucketIndexes.add(Arrays.asList(i, j));
            }
        }
        return bucketIndexes;
    }

    private Object generateFakeColumnValue(Integer columnIndex) {
        switch (columnIndex) {
            case 0: {
                return 1;
            }
            case 1: {
                return StringData.fromString((String)"Test");
            }
            case 2: {
                return Float.valueOf(1.21f);
            }
            case 3: {
                return 10;
            }
            case 4: {
                return false;
            }
        }
        throw new UnsupportedOperationException();
    }
}

