/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.CompactActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class CompactActionITCase
extends CompactActionITCaseBase {
    private static final DataType[] FIELD_TYPES = new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
    private static final RowType ROW_TYPE = RowType.of((DataType[])FIELD_TYPES, (String[])new String[]{"k", "v", "hh", "dt"});

    @Test
    @Timeout(value=60L)
    public void testBatchCompact() throws Exception {
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.runAction(false);
        } else {
            this.callProcedure(false);
        }
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        FileStoreTable table = this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 1L, Snapshot.CommitKind.APPEND);
        StreamTableScan scan = table.newReadBuilder().newStreamScan();
        TableScan.Plan plan = scan.plan();
        Assertions.assertThat((List)plan.splits()).isEmpty();
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.runAction(true);
        } else {
            this.callProcedure(true);
        }
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), 60000L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]"), 60000L);
        SnapshotManager snapshotManager = table.snapshotManager();
        CommonTestUtils.waitUtil(() -> snapshotManager.latestSnapshotId() - 2L == snapshotManager.earliestSnapshotId(), (Duration)Duration.ofSeconds(60000L), (Duration)Duration.ofSeconds(100L), (String)String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
    }

    @Test
    public void testUnawareBucketStreamingCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.runAction(true);
        } else {
            this.callProcedure(true);
        }
        this.checkFileAndRowSize(table, 3L, 30000L, 1, 6L);
        this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
        this.checkFileAndRowSize(table, 5L, 30000L, 1, 9L);
    }

    @Test
    public void testUnawareBucketBatchCompact() throws Exception {
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
        FileStoreTable table = this.prepareTable(Collections.singletonList("k"), Collections.emptyList(), tableOptions);
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.runAction(false);
        } else {
            this.callProcedure(false);
        }
        this.checkFileAndRowSize(table, 3L, 0L, 1, 6L);
    }

    @Test
    public void testTableConf() throws Exception {
        this.prepareTable(Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap());
        CompactAction compactAction = this.createAction(CompactAction.class, "compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--table_conf", FlinkConnectorOptions.SCAN_PARALLELISM.key() + "=6");
        Assertions.assertThat((String)((String)compactAction.table.options().get(FlinkConnectorOptions.SCAN_PARALLELISM.key()))).isEqualTo("6");
    }

    private FileStoreTable prepareTable(List<String> partitionKeys, List<String> primaryKeys, Map<String, String> tableOptions) throws Exception {
        FileStoreTable table = this.createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, tableOptions);
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = streamWriteBuilder.newWrite();
        this.commit = streamWriteBuilder.newCommit();
        return table;
    }

    private void checkLatestSnapshot(FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) {
        SnapshotManager snapshotManager = table.snapshotManager();
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(snapshotId);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)commitKind);
    }

    private void runAction(boolean isStreaming) throws Exception {
        StreamExecutionEnvironment env = this.buildDefaultEnv(isStreaming);
        CompactAction action = this.createAction(CompactAction.class, "compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition", "dt=20221208,hh=15", "--partition", "dt=20221209,hh=15");
        action.withStreamExecutionEnvironment(env).build();
        if (isStreaming) {
            env.executeAsync();
        } else {
            env.execute();
        }
    }

    private void callProcedure(boolean isStreaming) {
        this.callProcedure(String.format("CALL sys.compact('%s.%s', '%s')", this.database, this.tableName, "dt=20221208,hh=15;dt=20221209,hh=15"), isStreaming, !isStreaming);
    }
}

