/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DeleteFilesProcessor
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<String, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class);
    private final String tableName;
    private final String taskName;
    private final int taskIndex;
    private final SupportsBulkOperations io;
    private final Set<String> filesToDelete = Sets.newHashSet();
    private final int batchSize;
    private transient Counter failedCounter;
    private transient Counter succeededCounter;

    public DeleteFilesProcessor(Table table, String taskName, int taskIndex, int batchSize) {
        Preconditions.checkNotNull((Object)taskName, (Object)"Task name should no be null");
        Preconditions.checkNotNull((Object)table, (Object)"Table should no be null");
        FileIO fileIO = table.io();
        Preconditions.checkArgument((boolean)(fileIO instanceof SupportsBulkOperations), (String)"%s doesn't support bulk delete", (Object)fileIO.getClass().getSimpleName());
        this.tableName = table.name();
        this.taskName = taskName;
        this.taskIndex = taskIndex;
        this.io = (SupportsBulkOperations)fileIO;
        this.batchSize = batchSize;
    }

    public void open() throws Exception {
        MetricGroup taskMetricGroup = TableMaintenanceMetrics.groupFor((RuntimeContext)this.getRuntimeContext(), this.tableName, this.taskName, this.taskIndex);
        this.failedCounter = taskMetricGroup.counter("deleteFailed");
        this.succeededCounter = taskMetricGroup.counter("deleteSucceeded");
    }

    public void processElement(StreamRecord<String> element) throws Exception {
        if (element.isRecord()) {
            this.filesToDelete.add((String)element.getValue());
        }
        if (this.filesToDelete.size() >= this.batchSize) {
            this.deleteFiles();
        }
    }

    public void processWatermark(Watermark mark) {
        this.deleteFiles();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        this.deleteFiles();
    }

    private void deleteFiles() {
        try {
            this.io.deleteFiles(this.filesToDelete);
            LOG.info("Deleted {} files from table {} using bulk deletes", (Object)this.filesToDelete.size(), (Object)this.tableName);
            this.succeededCounter.inc((long)this.filesToDelete.size());
            this.filesToDelete.clear();
        }
        catch (BulkDeletionFailureException e) {
            int deletedFilesCount = this.filesToDelete.size() - e.numberFailedObjects();
            LOG.warn("Deleted only {} of {} files from table {} using bulk deletes", new Object[]{deletedFilesCount, this.filesToDelete.size(), this.tableName, e});
            this.succeededCounter.inc((long)deletedFilesCount);
            this.failedCounter.inc((long)e.numberFailedObjects());
        }
    }
}

