/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MetadataTablePlanner
extends ProcessFunction<Trigger, SplitInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataTablePlanner.class);
    private final String taskName;
    private final int taskIndex;
    private final TableLoader tableLoader;
    private final int workerPoolSize;
    private final ScanContext scanContext;
    private transient ExecutorService workerPool;
    private transient Counter errorCounter;
    private transient Table table;
    private transient IcebergSourceSplitSerializer splitSerializer;
    private final MetadataTableType metadataTableType;

    public MetadataTablePlanner(String taskName, int taskIndex, TableLoader tableLoader, ScanContext scanContext, MetadataTableType metadataTableType, int workerPoolSize) {
        Preconditions.checkNotNull((Object)taskName, (Object)"Task name should no be null");
        Preconditions.checkNotNull((Object)tableLoader, (Object)"Table should no be null");
        Preconditions.checkArgument((boolean)scanContext.isStreaming(), (Object)"Streaming should be set to true");
        this.taskName = taskName;
        this.taskIndex = taskIndex;
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
        this.workerPoolSize = workerPoolSize;
        this.metadataTableType = metadataTableType;
    }

    public void open(OpenContext openContext) throws Exception {
        this.tableLoader.open();
        Table originalTable = this.tableLoader.loadTable();
        this.table = MetadataTableUtils.createMetadataTableInstance((Table)originalTable, (MetadataTableType)this.metadataTableType);
        this.workerPool = ThreadPools.newFixedThreadPool((String)(this.table.name() + "-table-planner"), (int)this.workerPoolSize);
        this.splitSerializer = new IcebergSourceSplitSerializer(this.scanContext.caseSensitive());
        this.errorCounter = TableMaintenanceMetrics.groupFor(this.getRuntimeContext(), originalTable.name(), this.taskName, this.taskIndex).counter("error");
    }

    public void processElement(Trigger trigger, ProcessFunction.Context ctx, Collector<SplitInfo> out) throws Exception {
        try {
            this.table.refresh();
            for (IcebergSourceSplit split : FlinkSplitPlanner.planIcebergSourceSplits(this.table, this.scanContext, this.workerPool)) {
                out.collect((Object)new SplitInfo(this.splitSerializer.getVersion(), this.splitSerializer.serialize(split)));
            }
        }
        catch (Exception e) {
            LOG.warn("Exception planning scan for {} at {}", new Object[]{this.table, ctx.timestamp(), e});
            ctx.output(DeleteOrphanFiles.ERROR_STREAM, (Object)e);
            this.errorCounter.inc();
        }
    }

    public void close() throws Exception {
        super.close();
        this.tableLoader.close();
        if (this.workerPool != null) {
            this.workerPool.shutdown();
        }
    }

    public static class SplitInfo {
        private final int version;
        private final byte[] split;

        public SplitInfo(int version, byte[] split) {
            this.version = version;
            this.split = split;
        }

        public int version() {
            return this.version;
        }

        public byte[] split() {
            return this.split;
        }
    }
}

