/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;

public class CompactActionITCaseBase
extends ActionITCaseBase {
    protected static final DataType[] FIELD_TYPES = new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
    protected static final RowType ROW_TYPE = RowType.of((DataType[])FIELD_TYPES, (String[])new String[]{"k", "v", "hh", "dt"});

    protected void validateResult(FileStoreTable table, RowType rowType, StreamTableScan scan, List<String> expected, long timeout) throws Exception {
        ArrayList<String> actual = new ArrayList<String>();
        long start = System.currentTimeMillis();
        while (actual.size() != expected.size()) {
            TableScan.Plan plan = scan.plan();
            actual.addAll(this.getResult(table.newReadBuilder().newRead(), plan.splits(), rowType));
            if (System.currentTimeMillis() - start <= timeout) continue;
            break;
        }
        if (actual.size() != expected.size()) {
            throw new TimeoutException(String.format("Cannot collect %s records in %s milliseconds.", expected.size(), timeout));
        }
        actual.sort(String::compareTo);
        Assertions.assertThat(actual).isEqualTo(expected);
    }

    protected void checkFileAndRowSize(FileStoreTable table, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) throws Exception {
        SnapshotManager snapshotManager = table.snapshotManager();
        FileStoreScan scan = table.store().newScan();
        long start = System.currentTimeMillis();
        while (!Objects.equals(snapshotManager.latestSnapshotId(), expectedSnapshotId)) {
            Thread.sleep(500L);
            if (System.currentTimeMillis() - start <= timeout) continue;
            throw new RuntimeException("can't wait for a compaction.");
        }
        List files = scan.withSnapshot(snapshotManager.latestSnapshotId().longValue()).plan().files(FileKind.ADD);
        long count = 0L;
        for (ManifestEntry file : files) {
            count += file.file().rowCount();
        }
        Assertions.assertThat((int)files.size()).isEqualTo(fileNum);
        Assertions.assertThat((long)count).isEqualTo(rowCount);
    }

    protected void checkLatestSnapshot(FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) {
        SnapshotManager snapshotManager = table.snapshotManager();
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(snapshotId);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)commitKind);
    }

    protected FileStoreTable prepareTable(List<String> partitionKeys, List<String> primaryKeys, List<String> bucketKey, Map<String, String> tableOptions) throws Exception {
        FileStoreTable table = this.createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, bucketKey, tableOptions);
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = streamWriteBuilder.newWrite();
        this.commit = streamWriteBuilder.newCommit();
        return table;
    }
}

