/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

public class PurgeFilesProcedure
extends ProcedureBase {
    public static final String IDENTIFIER = "purge_files";

    @ProcedureHint(argument={@ArgumentHint(name="table", type=@DataTypeHint(value="STRING"))})
    public String[] call(ProcedureContext procedureContext, String tableId) throws Catalog.TableNotExistException {
        Table table = this.catalog.getTable(Identifier.fromString(tableId));
        FileStoreTable fileStoreTable = (FileStoreTable)table;
        FileIO fileIO = fileStoreTable.fileIO();
        Path tablePath = fileStoreTable.snapshotManager().tablePath();
        try {
            Arrays.stream(fileIO.listStatus(tablePath)).filter(f -> !f.getPath().getName().contains("schema")).forEach(fileStatus -> {
                try {
                    fileIO.delete(fileStatus.getPath(), true);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new String[]{String.format("Success purge files with table: %s.", fileStoreTable.name())};
    }

    @Override
    public String identifier() {
        return IDENTIFIER;
    }
}

