/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.table.FileStoreTable;

public class ResetConsumerProcedure
extends ProcedureBase {
    public static final String IDENTIFIER = "reset_consumer";

    @ProcedureHint(argument={@ArgumentHint(name="table", type=@DataTypeHint(value="STRING")), @ArgumentHint(name="consumer_id", type=@DataTypeHint(value="STRING")), @ArgumentHint(name="next_snapshot_id", type=@DataTypeHint(value="BIGINT"), isOptional=true)})
    public String[] call(ProcedureContext procedureContext, String tableId, String consumerId, Long nextSnapshotId) throws Catalog.TableNotExistException {
        FileStoreTable fileStoreTable = (FileStoreTable)this.catalog.getTable(Identifier.fromString(tableId));
        ConsumerManager consumerManager = new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location(), fileStoreTable.snapshotManager().branch());
        if (nextSnapshotId != null) {
            fileStoreTable.snapshotManager().snapshot(nextSnapshotId);
            consumerManager.resetConsumer(consumerId, new Consumer(nextSnapshotId));
        } else {
            consumerManager.deleteConsumer(consumerId);
        }
        return new String[]{"Success"};
    }

    @Override
    public String identifier() {
        return IDENTIFIER;
    }
}

