/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.RemoveOrphanFilesAction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase {
    private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
    private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";

    private FileStoreTable createTableAndWriteData(String tableName) throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.STRING()}, (String[])new String[]{"k", "v"});
        FileStoreTable table = this.createFileStoreTable(tableName, rowType, Collections.emptyList(), Collections.singletonList("k"), Collections.emptyList(), Collections.emptyMap());
        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = writeBuilder.newWrite();
        this.commit = writeBuilder.newCommit();
        this.writeData(this.rowData(1L, BinaryString.fromString((String)"Hi")));
        Path orphanFile1 = this.getOrphanFilePath(table, ORPHAN_FILE_1);
        Path orphanFile2 = this.getOrphanFilePath(table, ORPHAN_FILE_2);
        FileIO fileIO = table.fileIO();
        fileIO.writeFile(orphanFile1, "a", true);
        fileIO.writeFile(orphanFile2, "b", true);
        Thread.sleep(2000L);
        return table;
    }

    private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
        return new Path(table.location(), orphanFile);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRunWithoutException(boolean isNamedArgument) throws Exception {
        Assumptions.assumeTrue((!isNamedArgument || this.supportNamedArgument() ? 1 : 0) != 0);
        FileStoreTable table = this.createTableAndWriteData(this.tableName);
        ArrayList<String> args = new ArrayList<String>(Arrays.asList("remove_orphan_files", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName));
        RemoveOrphanFilesAction action1 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action1).run()).doesNotThrowAnyException();
        args.add("--older_than");
        args.add("2023-12-31 23:59:59");
        RemoveOrphanFilesAction action2 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action2).run()).doesNotThrowAnyException();
        String withoutOlderThan = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s')" : "CALL sys.remove_orphan_files('%s.%s')", this.database, this.tableName);
        CloseableIterator<Row> withoutOlderThanCollect = this.executeSQL(withoutOlderThan);
        Assertions.assertThat((List)ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"0"})});
        String olderThan = DateTimeUtils.formatLocalDateTime((LocalDateTime)DateTimeUtils.toLocalDateTime((long)System.currentTimeMillis()), (int)3);
        String withDryRun = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)" : "CALL sys.remove_orphan_files('%s.%s', '%s', true)", this.database, this.tableName, olderThan);
        ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(this.executeSQL(withDryRun));
        Assertions.assertThat((List)actualDryRunDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"2"})});
        String withOlderThan = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" : "CALL sys.remove_orphan_files('%s.%s', '%s')", this.database, this.tableName, olderThan);
        ImmutableList actualDeleteFile = ImmutableList.copyOf(this.executeSQL(withOlderThan));
        Assertions.assertThat((List)actualDeleteFile).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"2"}), Row.of((Object[])new Object[]{"2"})});
        FileIO fileIO = table.fileIO();
        Path location = table.location();
        Path bucketDir = new Path(location, "bucket-0");
        fileIO.delete(new Path(location, "snapshot"), true);
        ImmutableList.copyOf(this.executeSQL(withOlderThan));
        Assertions.assertThat((boolean)fileIO.exists(bucketDir)).isTrue();
        Assertions.assertThat((Object[])fileIO.listDirectories(bucketDir)).isEmpty();
        ImmutableList.copyOf(this.executeSQL(withOlderThan));
        Assertions.assertThat((boolean)fileIO.exists(bucketDir)).isFalse();
        Assertions.assertThat((boolean)fileIO.exists(location)).isTrue();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws Exception {
        Assumptions.assumeTrue((!isNamedArgument || this.supportNamedArgument() ? 1 : 0) != 0);
        this.createTableAndWriteData("tableName1");
        this.createTableAndWriteData("tableName2");
        ArrayList<String> args = new ArrayList<String>(Arrays.asList("remove_orphan_files", "--warehouse", this.warehouse, "--database", this.database));
        if (ThreadLocalRandom.current().nextBoolean()) {
            args.add("--table");
            args.add("*");
        }
        RemoveOrphanFilesAction action1 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action1).run()).doesNotThrowAnyException();
        args.add("--older_than");
        args.add("2023-12-31 23:59:59");
        RemoveOrphanFilesAction action2 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action2).run()).doesNotThrowAnyException();
        args.add("--parallelism");
        args.add("5");
        RemoveOrphanFilesAction action3 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action3).run()).doesNotThrowAnyException();
        String withoutOlderThan = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s')" : "CALL sys.remove_orphan_files('%s.%s')", this.database, "*");
        CloseableIterator<Row> withoutOlderThanCollect = this.executeSQL(withoutOlderThan);
        Assertions.assertThat((List)ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"0"})});
        String withParallelism = String.format("CALL sys.remove_orphan_files('%s.%s','',true,5)", this.database, "*");
        CloseableIterator<Row> withParallelismCollect = this.executeSQL(withParallelism);
        Assertions.assertThat((List)ImmutableList.copyOf(withParallelismCollect)).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"0"})});
        String olderThan = DateTimeUtils.formatLocalDateTime((LocalDateTime)DateTimeUtils.toLocalDateTime((long)System.currentTimeMillis()), (int)3);
        String withDryRun = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)" : "CALL sys.remove_orphan_files('%s.%s', '%s', true)", this.database, "*", olderThan);
        ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(this.executeSQL(withDryRun));
        Assertions.assertThat((List)actualDryRunDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"4"})});
        String withOlderThan = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" : "CALL sys.remove_orphan_files('%s.%s', '%s')", this.database, "*", olderThan);
        ImmutableList actualDeleteFile = ImmutableList.copyOf(this.executeSQL(withOlderThan));
        Assertions.assertThat((List)actualDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"4"})});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
        Assumptions.assumeTrue((!isNamedArgument || this.supportNamedArgument() ? 1 : 0) != 0);
        FileStoreTable table = this.createTableAndWriteData(this.tableName);
        table.createBranch("br");
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "br");
        TableSchema branchSchema = schemaManager.commitChanges(new SchemaChange[]{SchemaChange.addColumn((String)"v2", (DataType)DataTypes.INT())});
        Options branchOptions = new Options(branchSchema.options());
        branchOptions.set(CoreOptions.BRANCH, (Object)"br");
        branchSchema = branchSchema.copy(branchOptions.toMap());
        FileStoreTable branchTable = FileStoreTableFactory.create((FileIO)table.fileIO(), (Path)table.location(), (TableSchema)branchSchema);
        String commitUser = UUID.randomUUID().toString();
        TableWriteImpl write = branchTable.newWrite(commitUser);
        TableCommitImpl commit = branchTable.newCommit(commitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2L, BinaryString.fromString((String)"Hello"), 20}));
        commit.commit(1L, write.prepareCommit(false, 1L));
        write.close();
        commit.close();
        Path orphanFile3 = new Path(table.location(), "branch/branch-br/snapshot/orphan_file3");
        branchTable.fileIO().writeFile(orphanFile3, "x", true);
        table.createBranch("br2");
        Path orphanFile4 = new Path(table.location(), "branch/branch-br2/snapshot/orphan_file4");
        branchTable.fileIO().writeFile(orphanFile4, "y", true);
        Thread.sleep(2000L);
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.executeSQL(String.format("ALTER TABLE `%s`.`%s` SET ('%s' = 'br')", this.database, this.tableName, CoreOptions.SCAN_FALLBACK_BRANCH.key()), false, true);
        }
        String olderThan = DateTimeUtils.formatLocalDateTime((LocalDateTime)DateTimeUtils.toLocalDateTime((long)System.currentTimeMillis()), (int)3);
        String procedure = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" : "CALL sys.remove_orphan_files('%s.%s', '%s')", this.database, "*", olderThan);
        ImmutableList actualDeleteFile = ImmutableList.copyOf(this.executeSQL(procedure));
        Assertions.assertThat((List)actualDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"4"})});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRunWithMode(boolean isNamedArgument) throws Exception {
        Assumptions.assumeTrue((!isNamedArgument || this.supportNamedArgument() ? 1 : 0) != 0);
        this.createTableAndWriteData(this.tableName);
        ArrayList<String> args = new ArrayList<String>(Arrays.asList("remove_orphan_files", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName));
        RemoveOrphanFilesAction action1 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action1).run()).doesNotThrowAnyException();
        args.add("--older_than");
        args.add("2023-12-31 23:59:59");
        RemoveOrphanFilesAction action2 = this.createAction(RemoveOrphanFilesAction.class, args);
        Assertions.assertThatCode(() -> ((RemoveOrphanFilesAction)action2).run()).doesNotThrowAnyException();
        String withoutOlderThan = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s')" : "CALL sys.remove_orphan_files('%s.%s')", this.database, this.tableName);
        CloseableIterator<Row> withoutOlderThanCollect = this.executeSQL(withoutOlderThan);
        Assertions.assertThat((List)ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"0"})});
        String olderThan = DateTimeUtils.formatLocalDateTime((LocalDateTime)DateTimeUtils.toLocalDateTime((long)System.currentTimeMillis()), (int)3);
        String withLocalMode = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'local')" : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')", this.database, this.tableName, olderThan);
        ImmutableList actualLocalRunDeleteFile = ImmutableList.copyOf(this.executeSQL(withLocalMode));
        Assertions.assertThat((List)actualLocalRunDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"2"})});
        String withDistributedMode = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'distributed')" : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')", this.database, this.tableName, olderThan);
        ImmutableList actualDistributedRunDeleteFile = ImmutableList.copyOf(this.executeSQL(withDistributedMode));
        Assertions.assertThat((List)actualDistributedRunDeleteFile).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{"2"})});
        String withInvalidMode = String.format(isNamedArgument ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'unknown')" : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')", this.database, this.tableName, olderThan);
        ((AbstractThrowableAssert)Assertions.assertThatCode(() -> this.executeSQL(withInvalidMode)).isInstanceOf(RuntimeException.class)).hasMessageContaining("Unknown mode");
    }

    protected boolean supportNamedArgument() {
        return true;
    }
}

