/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class RangePartitionAndSortForUnawareBucketTableITCase
extends CatalogITCaseBase {
    private static final int SINK_ROW_NUMBER = 1000;

    @Test
    public void testSortConfigurationChecks() {
        this.batchSql("CREATE TEMPORARY TABLE source1 (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values', 'bounded'='true')", new Object[0]);
        this.batchSql("CREATE TABLE IF NOT EXISTS sink1 (col1 INT, col2 INT, col3 INT, col4 INT)", new Object[0]);
        this.streamSqlIter("CREATE TEMPORARY TABLE source2 (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values')", new Object[0]);
        this.streamSqlIter("CREATE TABLE IF NOT EXISTS sink2 (col1 INT, col2 INT, col3 INT, col4 INT)", new Object[0]);
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.batchSql("INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1,xx1') */ SELECT * FROM source1", new Object[0])).withMessageContaining("should contains all clustering column names");
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.batchSql("INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1', 'sink.clustering.sample-factor' = '10') */ SELECT * FROM source1", new Object[0])).withMessageContaining("The minimum allowed sink.clustering.sample-factor");
        this.batchSql("CREATE TEMPORARY TABLE source3 (col1 INT, col2 INT) WITH ('connector'='values', 'bounded'='true')", new Object[0]);
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.batchSql("INSERT INTO sink1 /*+ OPTIONS('sink.clustering.by-columns' = 'col1') */ SELECT source1.col1, source1.col2, source3.col1, source3.col2 FROM source1 JOIN source3 ON source1.col1 = source3.col1", new Object[0])).withMessageContaining("The sink parallelism must be specified when sorting the table data");
    }

    @Test
    public void testRangePartition() throws Exception {
        List<Row> inputRows = this.generateSinkRows();
        String id = TestValuesTableFactory.registerData(inputRows);
        this.batchSql("CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', 'sink.parallelism' = '10', 'sink.clustering.sort-in-cluster' = 'false') */ SELECT * FROM test_source", new Object[0]);
        List<Row> sinkRows = this.batchSql("SELECT * FROM test_table", new Object[0]);
        Assertions.assertThat((int)sinkRows.size()).isEqualTo(1000);
        FileStoreTable testStoreTable = this.paimonTable("test_table");
        List files = testStoreTable.store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(10);
        ArrayList<Tuple2> minMaxOfEachFile = new ArrayList<Tuple2>();
        for (ManifestEntry file : files) {
            DataSplit dataSplit = DataSplit.builder().withPartition(file.partition()).withBucket(file.bucket()).withDataFiles(Collections.singletonList(file.file())).withBucketPath("/temp/xxx").build();
            AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
            AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
            testStoreTable.newReadBuilder().newRead().createReader((Split)dataSplit).forEachRemaining(internalRow -> {
                int result = internalRow.getInt(0);
                min.set(Math.min(min.get(), result));
                max.set(Math.max(max.get(), result));
            });
            minMaxOfEachFile.add(Tuple2.of((Object)min.get(), (Object)max.get()));
        }
        minMaxOfEachFile.sort(Comparator.comparing(o -> (Integer)o.f0));
        Tuple2 preResult = (Tuple2)minMaxOfEachFile.get(0);
        for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
            Tuple2 currentResult = (Tuple2)minMaxOfEachFile.get(index);
            Assertions.assertThat((Integer)((Integer)currentResult.f0)).isGreaterThanOrEqualTo(0);
            Assertions.assertThat((Integer)((Integer)currentResult.f1)).isLessThanOrEqualTo(1000);
            Assertions.assertThat((Integer)((Integer)currentResult.f0)).isGreaterThanOrEqualTo((Comparable)preResult.f1);
        }
    }

    @Test
    public void testRangePartitionAndSortWithOrderStrategy() throws Exception {
        List<Row> inputRows = this.generateSinkRows();
        String id = TestValuesTableFactory.registerData(inputRows);
        this.batchSql("CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', 'sink.parallelism' = '10', 'sink.clustering.strategy' = 'order') */ SELECT * FROM test_source", new Object[0]);
        List<Row> sinkRows = this.batchSql("SELECT * FROM test_table", new Object[0]);
        Assertions.assertThat((int)sinkRows.size()).isEqualTo(1000);
        FileStoreTable testStoreTable = this.paimonTable("test_table");
        List files = testStoreTable.store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(10);
        ArrayList<Tuple2> minMaxOfEachFile = new ArrayList<Tuple2>();
        for (ManifestEntry file : files) {
            DataSplit dataSplit = DataSplit.builder().withPartition(file.partition()).withBucket(file.bucket()).withDataFiles(Collections.singletonList(file.file())).withBucketPath("/temp/xxx").build();
            AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
            AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
            AtomicInteger current = new AtomicInteger(Integer.MIN_VALUE);
            testStoreTable.newReadBuilder().newRead().createReader((Split)dataSplit).forEachRemaining(internalRow -> {
                int result = internalRow.getInt(0);
                min.set(Math.min(min.get(), result));
                max.set(Math.max(max.get(), result));
                Assertions.assertThat((int)result).isGreaterThanOrEqualTo(current.get());
                current.set(result);
            });
            minMaxOfEachFile.add(Tuple2.of((Object)min.get(), (Object)max.get()));
        }
        minMaxOfEachFile.sort(Comparator.comparing(o -> (Integer)o.f0));
        Tuple2 preResult = (Tuple2)minMaxOfEachFile.get(0);
        for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
            Tuple2 currentResult = (Tuple2)minMaxOfEachFile.get(index);
            Assertions.assertThat((Integer)((Integer)currentResult.f0)).isGreaterThanOrEqualTo(0);
            Assertions.assertThat((Integer)((Integer)currentResult.f1)).isLessThanOrEqualTo(1000);
            Assertions.assertThat((Integer)((Integer)currentResult.f0)).isGreaterThanOrEqualTo((Comparable)preResult.f1);
        }
    }

    @Test
    public void testRangePartitionAndSortWithZOrderStrategy() throws Exception {
        List<Row> inputRows = this.generateSinkRows();
        String id = TestValuesTableFactory.registerData(inputRows);
        this.batchSql("CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', 'sink.parallelism' = '10', 'sink.clustering.strategy' = 'zorder') */ SELECT * FROM test_source", new Object[0]);
        List<Row> sinkRows = this.batchSql("SELECT * FROM test_table", new Object[0]);
        Assertions.assertThat((int)sinkRows.size()).isEqualTo(1000);
        FileStoreTable testStoreTable = this.paimonTable("test_table");
        PredicateBuilder predicateBuilder = new PredicateBuilder(testStoreTable.rowType());
        Predicate predicate = predicateBuilder.between(0, (Object)100, (Object)200);
        List files = testStoreTable.store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(10);
        List filesFilter = ((AppendOnlyFileStoreScan)testStoreTable.store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isGreaterThan(filesFilter.size());
    }

    @Test
    public void testRangePartitionAndSortWithHilbertStrategy() throws Exception {
        List<Row> inputRows = this.generateSinkRows();
        String id = TestValuesTableFactory.registerData(inputRows);
        this.batchSql("CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", id);
        this.batchSql("INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1,col2', 'sink.parallelism' = '10', 'sink.clustering.strategy' = 'hilbert') */ SELECT * FROM test_source", new Object[0]);
        List<Row> sinkRows = this.batchSql("SELECT * FROM test_table", new Object[0]);
        Assertions.assertThat((int)sinkRows.size()).isEqualTo(1000);
        FileStoreTable testStoreTable = this.paimonTable("test_table");
        PredicateBuilder predicateBuilder = new PredicateBuilder(testStoreTable.rowType());
        Predicate predicate = predicateBuilder.between(0, (Object)100, (Object)200);
        List files = testStoreTable.store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(10);
        List filesFilter = ((AppendOnlyFileStoreScan)testStoreTable.store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isGreaterThan(filesFilter.size());
    }

    private List<Row> generateSinkRows() {
        ArrayList<Row> sinkRows = new ArrayList<Row>();
        Random random = new Random();
        for (int round = 0; round < 1000; ++round) {
            sinkRows.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{random.nextInt(1000), random.nextInt(1000), random.nextInt(1000), random.nextInt(1000)}));
        }
        return sinkRows;
    }

    @Override
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS test_table (col1 INT, col2 INT, col3 INT, col4 INT)");
    }
}

