/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.CompactActionITCaseBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class CompactActionITCase
extends CompactActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testBatchCompact() throws Exception {
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.runAction(false);
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    @Timeout(value=60L)
    public void testCompactWhenSkipLevel0() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        if (ThreadLocalRandom.current().nextBoolean()) {
            tableOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
        } else {
            tableOptions.put(CoreOptions.MERGE_ENGINE.key(), "first-row");
        }
        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        Assertions.assertThat((int)table.newScan().plan().splits().size()).isEqualTo(0);
        this.runAction(false);
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 1L, Snapshot.CommitKind.APPEND);
        StreamTableScan scan = table.newReadBuilder().newStreamScan();
        TableScan.Plan plan = scan.plan();
        Assertions.assertThat((List)plan.splits()).isEmpty();
        this.runAction(true);
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), 60000L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]"), 60000L);
        SnapshotManager snapshotManager = table.snapshotManager();
        CommonTestUtils.waitUtil(() -> snapshotManager.latestSnapshotId() - 2L == snapshotManager.earliestSnapshotId(), (Duration)Duration.ofSeconds(60000L), (Duration)Duration.ofSeconds(10L), (String)String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
    }

    @ParameterizedTest(name="mode = {0}")
    @ValueSource(booleans={true, false})
    @Timeout(value=60L)
    public void testHistoryPartitionCompact(boolean mode) throws Exception {
        FileStoreTable table;
        String partitionIdleTime = "5s";
        if (mode) {
            table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
        } else {
            HashMap<String, String> tableOptions = new HashMap<String, String>();
            tableOptions.put(CoreOptions.BUCKET.key(), "-1");
            tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
            table = this.prepareTable(Arrays.asList("dt", "hh"), Collections.emptyList(), Collections.emptyList(), tableOptions);
        }
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
        Thread.sleep(5000L);
        this.writeData(this.rowData(3, 100, 16, BinaryString.fromString((String)"20221208")));
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.APPEND);
        CompactAction action = this.createAction(CompactAction.class, "compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition_idle_time", partitionIdleTime);
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
        action.withStreamExecutionEnvironment(env).build();
        env.execute();
        this.checkLatestSnapshot(table, 4L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(3);
        }
    }

    @Test
    public void testStreamingCompactWithChangedExternalPath() throws Exception {
        String externalPath1 = this.getTempDirPath();
        String externalPath2 = this.getTempDirPath();
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "traceable://" + externalPath1);
        tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin");
        tableOptions.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), "external-paths");
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 1L, Snapshot.CommitKind.APPEND);
        StreamTableScan scan = table.newReadBuilder().newStreamScan();
        TableScan.Plan plan = scan.plan();
        Assertions.assertThat((List)plan.splits()).isEmpty();
        this.runAction(true);
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), 60000L);
        LocalFileIO fileIO = LocalFileIO.create();
        Assertions.assertThat((boolean)fileIO.exists(new Path(externalPath2))).isFalse();
        Assertions.assertThat((int)fileIO.listStatus(new Path(externalPath1)).length).isGreaterThanOrEqualTo(1);
        SchemaChange schemaChange = SchemaChange.setOption((String)CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), (String)("traceable://" + externalPath2));
        table.schemaManager().commitChanges(new SchemaChange[]{schemaChange});
        Thread.sleep(1000L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]"), 60000L);
        Assertions.assertThat((boolean)fileIO.exists(new Path(externalPath2))).isTrue();
        Assertions.assertThat((int)fileIO.listStatus(new Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
    }

    @Test
    public void testUnawareBucketStreamingCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.runActionForUnawareTable(true);
        this.checkFileAndRowSize(table, 3L, 30000L, 1, 6L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.checkFileAndRowSize(table, 5L, 30000L, 1, 9L);
    }

    @Test
    public void testUnawareBucketStreamingCompactWithChangedExternalPath() throws Exception {
        String externalPath1 = this.getTempDirPath();
        String externalPath2 = this.getTempDirPath();
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "traceable://" + externalPath1);
        tableOptions.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin");
        tableOptions.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), "external-paths");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.runActionForUnawareTable(true);
        this.checkFileAndRowSize(table, 3L, 30000L, 1, 6L);
        LocalFileIO fileIO = LocalFileIO.create();
        Assertions.assertThat((boolean)fileIO.exists(new Path(externalPath2))).isFalse();
        Assertions.assertThat((int)fileIO.listStatus(new Path(externalPath1)).length).isGreaterThanOrEqualTo(1);
        SchemaChange schemaChange = SchemaChange.setOption((String)CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), (String)("traceable://" + externalPath2));
        table.schemaManager().commitChanges(new SchemaChange[]{schemaChange});
        Thread.sleep(1000L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.checkFileAndRowSize(table, 5L, 30000L, 1, 9L);
        Assertions.assertThat((boolean)fileIO.exists(new Path(externalPath2))).isTrue();
        Assertions.assertThat((int)fileIO.listStatus(new Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
    }

    @Test
    public void testUnawareBucketBatchCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.runActionForUnawareTable(false);
        this.checkFileAndRowSize(table, 3L, 0L, 1, 6L);
    }

    @Test
    public void testLotsOfPartitionsCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        ArrayList<String> partitions = new ArrayList<String>();
        for (int i = 0; i < 1000; ++i) {
            partitions.add("--partition");
            partitions.add("k=" + i);
        }
        this.runActionForUnawareTable(false, partitions);
        this.checkFileAndRowSize(table, 3L, 0L, 1, 6L);
    }

    @Test
    public void testTableConf() throws Exception {
        this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyList(), Collections.emptyMap());
        CompactAction compactAction = this.createAction(CompactAction.class, "compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--table_conf", FlinkConnectorOptions.SCAN_PARALLELISM.key() + "=6");
        Assertions.assertThat((String)((String)compactAction.table.options().get(FlinkConnectorOptions.SCAN_PARALLELISM.key()))).isEqualTo("6");
    }

    @Test
    public void testSpecifyNonPartitionField() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        this.prepareTable(Collections.singletonList("v"), Arrays.asList(new String[0]), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        Assertions.assertThatThrownBy(() -> this.runAction(false)).hasMessage("Only partition key can be specialized in compaction action.");
    }

    @Test
    public void testWrongUsage() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        this.prepareTable(Collections.singletonList("v"), Arrays.asList(new String[0]), Collections.emptyList(), tableOptions);
        Assertions.assertThatThrownBy(() -> this.createAction(CompactAction.class, "compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition_idle_time", "5s", "--order_strategy", "zorder", "--order_by", "dt,hh")).hasMessage("sort compact do not support 'partition_idle_time'.");
    }

    private void runAction(boolean isStreaming) throws Exception {
        this.runAction(isStreaming, false, Collections.emptyList());
    }

    private void runActionForUnawareTable(boolean isStreaming, List<String> extra) throws Exception {
        this.runAction(isStreaming, true, extra);
    }

    private void runActionForUnawareTable(boolean isStreaming) throws Exception {
        this.runAction(isStreaming, true, Collections.emptyList());
    }

    private void runAction(boolean isStreaming, boolean unawareBucket, List<String> extra) throws Exception {
        StreamExecutionEnvironment env = isStreaming ? this.streamExecutionEnvironmentBuilder().streamingMode().build() : this.streamExecutionEnvironmentBuilder().batchMode().build();
        ArrayList baseArgs = Lists.newArrayList((Object[])new String[]{"compact", "--database", this.database, "--table", this.tableName});
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if (random.nextBoolean()) {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--warehouse", this.warehouse}));
        } else {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--catalog_conf", "warehouse=" + this.warehouse}));
        }
        if (unawareBucket) {
            if (random.nextBoolean()) {
                baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--where", "k=1"}));
            } else {
                baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--partition", "k=1"}));
            }
        } else if (random.nextBoolean()) {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--where", "(dt=20221208 and hh=15) or (dt=20221209 and hh=15)"}));
        } else {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--partition", "dt=20221208,hh=15", "--partition", "dt=20221209,hh=15"}));
        }
        baseArgs.addAll(extra);
        CompactAction action = this.createAction(CompactAction.class, baseArgs.toArray(new String[0]));
        action.withStreamExecutionEnvironment(env).build();
        if (isStreaming) {
            env.executeAsync();
        } else {
            env.execute();
        }
    }
}

