/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ExpireSnapshotsProcessor
extends ProcessFunction<Trigger, TaskResult> {
    private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
    public static final OutputTag<String> DELETE_STREAM = new OutputTag("expire-snapshots-file-deletes-stream", Types.STRING);
    private final TableLoader tableLoader;
    private final Long maxSnapshotAgeMs;
    private final Integer numSnapshots;
    private final Integer plannerPoolSize;
    private final Boolean cleanExpiredMetadata;
    private transient ExecutorService plannerPool;
    private transient Table table;

    public ExpireSnapshotsProcessor(TableLoader tableLoader, Long maxSnapshotAgeMs, Integer numSnapshots, Integer plannerPoolSize, Boolean cleanExpiredMetadata) {
        Preconditions.checkNotNull((Object)tableLoader, (Object)"Table loader should not be null");
        this.tableLoader = tableLoader;
        this.maxSnapshotAgeMs = maxSnapshotAgeMs;
        this.numSnapshots = numSnapshots;
        this.plannerPoolSize = plannerPoolSize;
        this.cleanExpiredMetadata = cleanExpiredMetadata;
    }

    public void open(Configuration parameters) throws Exception {
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.plannerPool = this.plannerPoolSize != null ? ThreadPools.newFixedThreadPool((String)(this.table.name() + "-table--planner"), (int)this.plannerPoolSize) : ThreadPools.getWorkerPool();
    }

    public void processElement(Trigger trigger, ProcessFunction.Context ctx, Collector<TaskResult> out) throws Exception {
        try {
            this.table.refresh();
            ExpireSnapshots expireSnapshots = this.table.expireSnapshots();
            if (this.maxSnapshotAgeMs != null) {
                expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - this.maxSnapshotAgeMs);
            }
            if (this.numSnapshots != null) {
                expireSnapshots = expireSnapshots.retainLast(this.numSnapshots.intValue());
            }
            if (this.cleanExpiredMetadata != null) {
                expireSnapshots.cleanExpiredMetadata(this.cleanExpiredMetadata.booleanValue());
            }
            AtomicLong deleteFileCounter = new AtomicLong(0L);
            expireSnapshots.planWith(this.plannerPool).deleteWith(file -> {
                ctx.output(DELETE_STREAM, file);
                deleteFileCounter.incrementAndGet();
            }).cleanExpiredFiles(true).commit();
            LOG.info("Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", new Object[]{this.table, ctx.timestamp(), deleteFileCounter.get()});
            out.collect((Object)new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList()));
        }
        catch (Exception e) {
            LOG.error("Failed to expiring snapshots for {} at {}", new Object[]{this.table, ctx.timestamp(), e});
            out.collect((Object)new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList((Object[])new Exception[]{e})));
        }
    }

    public void close() throws Exception {
        super.close();
        this.tableLoader.close();
        if (this.plannerPoolSize != null) {
            this.plannerPool.shutdown();
        }
    }
}

