/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class SortCompactActionForUnawareBucketITCase
extends ActionITCaseBase {
    private static final Random RANDOM = new Random();

    private void prepareData(int size, int loop) throws Exception {
        this.createTable();
        ArrayList<CommitMessage> commitMessages = new ArrayList<CommitMessage>();
        for (int i = 0; i < loop; ++i) {
            commitMessages.addAll(this.writeData(size));
        }
        this.commit(commitMessages);
    }

    private void prepareSameData(int size) throws Exception {
        this.createTable();
        BatchWriteBuilder builder = this.getTable().newBatchWriteBuilder();
        try (BatchTableWrite batchTableWrite = builder.newWrite();){
            for (int i = 0; i < size; ++i) {
                batchTableWrite.write(SortCompactActionForUnawareBucketITCase.data(0, 0, 0));
            }
            this.commit(batchTableWrite.prepareCommit());
        }
    }

    @Test
    public void testOrderBy() throws Exception {
        this.prepareData(300, 1);
        Assertions.assertThatCode(() -> this.order(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"))).doesNotThrowAnyException();
    }

    @Test
    public void testOrderResult() throws Exception {
        this.prepareData(300, 2);
        Assertions.assertThatCode(() -> this.order(Arrays.asList("f1", "f2"))).doesNotThrowAnyException();
        List files = this.getTable().store().newScan().plan().files();
        ManifestEntry entry = (ManifestEntry)files.get(0);
        DataSplit dataSplit = DataSplit.builder().withPartition(entry.partition()).withBucket(entry.bucket()).withDataFiles(Collections.singletonList(entry.file())).withBucketPath("not used").build();
        AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE);
        this.getTable().newReadBuilder().newRead().createReader((Split)dataSplit).forEachRemaining(a -> {
            Integer current = a.getInt(1);
            Assertions.assertThat((Integer)current).isGreaterThanOrEqualTo(i.get());
            i.set(current);
        });
        Assertions.assertThatCode(() -> this.order(Arrays.asList("f2", "f1"))).doesNotThrowAnyException();
        files = this.getTable().store().newScan().plan().files();
        entry = (ManifestEntry)files.get(0);
        dataSplit = DataSplit.builder().withPartition(entry.partition()).withBucket(entry.bucket()).withDataFiles(Collections.singletonList(entry.file())).withBucketPath("not used").build();
        i.set(Integer.MIN_VALUE);
        this.getTable().newReadBuilder().newRead().createReader((Split)dataSplit).forEachRemaining(a -> {
            Integer current = a.getInt(2);
            Assertions.assertThat((Integer)current).isGreaterThanOrEqualTo(i.get());
            i.set(current);
        });
    }

    @Test
    public void testAllBasicTypeWorksWithZorder() throws Exception {
        this.prepareData(300, 1);
        Assertions.assertThatCode(() -> this.zorder(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"))).doesNotThrowAnyException();
    }

    @Test
    public void testAllBasicTypeWorksWithHilbert() throws Exception {
        this.prepareData(300, 1);
        Assertions.assertThatCode(() -> this.hilbert(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"))).doesNotThrowAnyException();
    }

    @Test
    public void testZorderActionWorks() throws Exception {
        this.prepareData(300, 2);
        PredicateBuilder predicateBuilder = new PredicateBuilder(this.getTable().rowType());
        Predicate predicate = predicateBuilder.between(1, (Object)100, (Object)200);
        List files = this.getTable().store().newScan().plan().files();
        List filesFilter = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(filesFilter.size());
        this.zorder(Arrays.asList("f2", "f1"));
        files = this.getTable().store().newScan().plan().files();
        filesFilter = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isGreaterThan(filesFilter.size());
    }

    @Test
    public void testHilbertActionWorks() throws Exception {
        this.prepareData(300, 2);
        PredicateBuilder predicateBuilder = new PredicateBuilder(this.getTable().rowType());
        Predicate predicate = predicateBuilder.between(1, (Object)100, (Object)200);
        List files = this.getTable().store().newScan().plan().files();
        List filesFilter = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(filesFilter.size());
        this.hilbert(Arrays.asList("f2", "f1"));
        files = this.getTable().store().newScan().plan().files();
        filesFilter = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((int)files.size()).isGreaterThan(filesFilter.size());
    }

    @Test
    public void testCompareZorderAndOrder() throws Exception {
        this.prepareData(300, 10);
        this.zorder(Arrays.asList("f2", "f1"));
        PredicateBuilder predicateBuilder = new PredicateBuilder(this.getTable().rowType());
        Predicate predicate = predicateBuilder.between(1, (Object)10, (Object)20);
        List filesZorder = this.getTable().store().newScan().plan().files();
        List filesFilterZorder = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        this.order(Arrays.asList("f2", "f1"));
        List filesOrder = this.getTable().store().newScan().plan().files();
        List filesFilterOrder = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((double)((double)filesFilterZorder.size() / (double)filesZorder.size())).isLessThan((double)filesFilterOrder.size() / (double)filesOrder.size());
    }

    @Test
    public void testCompareHilbertAndOrder() throws Exception {
        this.prepareData(300, 10);
        this.hilbert(Arrays.asList("f2", "f1"));
        PredicateBuilder predicateBuilder = new PredicateBuilder(this.getTable().rowType());
        Predicate predicate = predicateBuilder.between(1, (Object)10, (Object)20);
        List filesHilbert = this.getTable().store().newScan().plan().files();
        List filesFilterHilbert = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        this.order(Arrays.asList("f2", "f1"));
        List filesOrder = this.getTable().store().newScan().plan().files();
        List filesFilterOrder = ((AppendOnlyFileStoreScan)this.getTable().store().newScan()).withFilter(predicate).plan().files();
        Assertions.assertThat((double)((double)filesFilterHilbert.size() / (double)filesHilbert.size())).isLessThan((double)filesFilterOrder.size() / (double)filesOrder.size());
    }

    @Test
    public void testTableConf() throws Exception {
        this.createTable();
        SortCompactAction sortCompactAction = new SortCompactAction(this.warehouse, this.database, this.tableName, Collections.emptyMap(), Collections.singletonMap(FlinkConnectorOptions.SINK_PARALLELISM.key(), "20")).withOrderStrategy("zorder").withOrderColumns(Collections.singletonList("f0"));
        Assertions.assertThat((String)((String)sortCompactAction.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key()))).isEqualTo("20");
    }

    @Test
    public void testRandomSuffixWorks() throws Exception {
        this.prepareSameData(200);
        Assertions.assertThatCode(() -> this.order(Collections.singletonList("f1"))).doesNotThrowAnyException();
        List files = this.getTable().store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(3);
        this.dropTable();
        this.prepareSameData(200);
        Assertions.assertThatCode(() -> this.zorder(Arrays.asList("f1", "f2"))).doesNotThrowAnyException();
        files = this.getTable().store().newScan().plan().files();
        Assertions.assertThat((int)files.size()).isEqualTo(3);
    }

    @Test
    public void testSortCompactionOnEmptyData() throws Exception {
        this.createTable();
        SortCompactAction sortCompactAction = new SortCompactAction(this.warehouse, this.database, this.tableName, Collections.emptyMap(), Collections.emptyMap()).withOrderStrategy("zorder").withOrderColumns(Collections.singletonList("f0"));
        sortCompactAction.run();
    }

    private void zorder(List<String> columns) throws Exception {
        String rangeStrategy = RANDOM.nextBoolean() ? "size" : "quantity";
        this.createAction("zorder", rangeStrategy, columns).run();
    }

    private void hilbert(List<String> columns) throws Exception {
        String rangeStrategy = RANDOM.nextBoolean() ? "size" : "quantity";
        this.createAction("hilbert", rangeStrategy, columns).run();
    }

    private void order(List<String> columns) throws Exception {
        String rangeStrategy = RANDOM.nextBoolean() ? "size" : "quantity";
        this.createAction("order", rangeStrategy, columns).run();
    }

    private SortCompactAction createAction(String orderStrategy, String rangeStrategy, List<String> columns) {
        return this.createAction(orderStrategy, rangeStrategy, columns, Lists.newArrayList());
    }

    private SortCompactAction createAction(String orderStrategy, String rangeStrategy, List<String> columns, List<String> extraConfigs) {
        ArrayList args = Lists.newArrayList((Object[])new String[]{"compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--order_strategy", orderStrategy, "--order_by", String.join((CharSequence)",", columns), "--table_conf", "sort-compaction.range-strategy=" + rangeStrategy});
        args.addAll(extraConfigs);
        return this.createAction(SortCompactAction.class, args.toArray(new String[0]));
    }

    @Test
    public void testvalidSampleConfig() throws Exception {
        this.prepareData(300, 1);
        ArrayList extraCompactionConfig = Lists.newArrayList((Object[])new String[]{"--table_conf", "sort-compaction.local-sample.magnification=1"});
        Assertions.assertThatCode(() -> this.createAction("order", "size", Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"), extraCompactionConfig).run()).hasMessage("the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
    }

    private void createTable() throws Exception {
        this.catalog.createDatabase(this.database, true);
        this.catalog.createTable(this.identifier(), SortCompactActionForUnawareBucketITCase.schema(), true);
    }

    private void dropTable() throws Exception {
        this.catalog.dropTable(this.identifier(), true);
    }

    private Identifier identifier() {
        return Identifier.create((String)this.database, (String)this.tableName);
    }

    private void commit(List<CommitMessage> messages) throws Exception {
        BatchTableCommit commit = this.getTable().newBatchWriteBuilder().newCommit();
        commit.commit(messages);
        commit.close();
    }

    private static Schema schema() {
        Schema.Builder schemaBuilder = Schema.newBuilder();
        schemaBuilder.column("f0", (DataType)DataTypes.TINYINT());
        schemaBuilder.column("f1", (DataType)DataTypes.INT());
        schemaBuilder.column("f2", (DataType)DataTypes.SMALLINT());
        schemaBuilder.column("f3", (DataType)DataTypes.STRING());
        schemaBuilder.column("f4", (DataType)DataTypes.DOUBLE());
        schemaBuilder.column("f5", (DataType)DataTypes.CHAR((int)10));
        schemaBuilder.column("f6", (DataType)DataTypes.VARCHAR((int)10));
        schemaBuilder.column("f7", (DataType)DataTypes.BOOLEAN());
        schemaBuilder.column("f8", (DataType)DataTypes.DATE());
        schemaBuilder.column("f9", (DataType)DataTypes.TIME());
        schemaBuilder.column("f10", (DataType)DataTypes.TIMESTAMP());
        schemaBuilder.column("f11", (DataType)DataTypes.DECIMAL((int)10, (int)2));
        schemaBuilder.column("f12", (DataType)DataTypes.BYTES());
        schemaBuilder.column("f13", (DataType)DataTypes.FLOAT());
        schemaBuilder.column("f14", (DataType)DataTypes.BINARY((int)10));
        schemaBuilder.column("f15", (DataType)DataTypes.VARBINARY((int)10));
        schemaBuilder.option("bucket", "-1");
        schemaBuilder.option("scan.parallelism", "6");
        schemaBuilder.option("sink.parallelism", "3");
        schemaBuilder.option("target-file-size", "1 M");
        schemaBuilder.partitionKeys(new String[]{"f0"});
        return schemaBuilder.build();
    }

    private List<CommitMessage> writeData(int size) throws Exception {
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        for (int i = 0; i < 2; ++i) {
            messages.addAll(SortCompactActionForUnawareBucketITCase.writeOnce((Table)this.getTable(), i, size));
        }
        return messages;
    }

    private FileStoreTable getTable() throws Exception {
        return (FileStoreTable)this.catalog.getTable(this.identifier());
    }

    private static List<CommitMessage> writeOnce(Table table, int p, int size) throws Exception {
        BatchWriteBuilder builder = table.newBatchWriteBuilder();
        try (BatchTableWrite batchTableWrite = builder.newWrite();){
            for (int i = 0; i < size; ++i) {
                for (int j = 0; j < size; ++j) {
                    batchTableWrite.write(SortCompactActionForUnawareBucketITCase.data(p, i, j));
                }
            }
            List list = batchTableWrite.prepareCommit();
            return list;
        }
    }

    private static InternalRow data(int p, int i, int j) {
        return GenericRow.of((Object[])new Object[]{(byte)p, j, (short)i, BinaryString.fromString((String)String.valueOf(j)), 0.1 + (double)i, BinaryString.fromString((String)String.valueOf(j)), BinaryString.fromString((String)String.valueOf(i)), j % 2 == 1, i, j, Timestamp.fromEpochMillis((long)i), Decimal.zero((int)10, (int)2), String.valueOf(i).getBytes(), Float.valueOf(0.1f + (float)j), SortCompactActionForUnawareBucketITCase.randomBytes(), SortCompactActionForUnawareBucketITCase.randomBytes()});
    }

    private static byte[] randomBytes() {
        byte[] binary = new byte[RANDOM.nextInt(10)];
        RANDOM.nextBytes(binary);
        return binary;
    }
}

