/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DataFileRewritePlanner
extends ProcessFunction<Trigger, PlannedGroup> {
    static final String MESSAGE_PREFIX = "[For table {} with {}[{}] at {}]: ";
    private static final Logger LOG = LoggerFactory.getLogger(DataFileRewritePlanner.class);
    private final String tableName;
    private final String taskName;
    private final int taskIndex;
    private final TableLoader tableLoader;
    private final int partialProgressMaxCommits;
    private final long maxRewriteBytes;
    private final Map<String, String> rewriterOptions;
    private transient Counter errorCounter;
    private final Expression filter;

    public DataFileRewritePlanner(String tableName, String taskName, int taskIndex, TableLoader tableLoader, int newPartialProgressMaxCommits, long maxRewriteBytes, Map<String, String> rewriterOptions, Expression filter) {
        Preconditions.checkNotNull(tableName, "Table name should no be null");
        Preconditions.checkNotNull(taskName, "Task name should no be null");
        Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
        Preconditions.checkNotNull(rewriterOptions, "Options map should no be null");
        this.tableName = tableName;
        this.taskName = taskName;
        this.taskIndex = taskIndex;
        this.tableLoader = tableLoader;
        this.partialProgressMaxCommits = newPartialProgressMaxCommits;
        this.maxRewriteBytes = maxRewriteBytes;
        this.rewriterOptions = rewriterOptions;
        this.filter = filter;
    }

    public void open(Configuration parameters) throws Exception {
        this.tableLoader.open();
        Table table = this.tableLoader.loadTable();
        Preconditions.checkArgument(!TableUtil.supportsRowLineage(table), "Flink does not support compaction on row lineage enabled tables (V3+)");
        this.errorCounter = TableMaintenanceMetrics.groupFor(this.getRuntimeContext(), this.tableName, this.taskName, this.taskIndex).counter("error");
    }

    public void processElement(Trigger value, ProcessFunction.Context ctx, Collector<PlannedGroup> out) throws Exception {
        LOG.info("[For table {} with {}[{}] at {}]: Creating rewrite plan", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp()});
        try {
            SerializableTable table = (SerializableTable)SerializableTable.copyOf(this.tableLoader.loadTable());
            if (table.currentSnapshot() == null) {
                LOG.info("[For table {} with {}[{}] at {}]: Nothing to plan for in an empty table", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp()});
                return;
            }
            BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, this.filter);
            planner.init(this.rewriterOptions);
            FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> plan = planner.plan();
            long rewriteBytes = 0L;
            ArrayList<RewriteFileGroup> groups = Lists.newArrayList();
            for (RewriteFileGroup group : plan.groups()) {
                if (rewriteBytes + group.inputFilesSizeInBytes() > this.maxRewriteBytes) {
                    LOG.info("[For table {} with {}[{}] at {}]: Skipping group as max rewrite size reached {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), group});
                    continue;
                }
                rewriteBytes += group.inputFilesSizeInBytes();
                groups.add(group);
            }
            int groupsPerCommit = IntMath.divide(groups.size(), this.partialProgressMaxCommits, RoundingMode.CEILING);
            LOG.info("[For table {} with {}[{}] at {}]: Rewrite plan created {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), groups});
            for (RewriteFileGroup group : groups) {
                LOG.info("[For table {} with {}[{}] at {}]: Emitting {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), group});
                out.collect((Object)new PlannedGroup(table, groupsPerCommit, group));
            }
        }
        catch (Exception e) {
            LOG.warn("[For table {} with {}[{}] at {}]: Failed to plan data file rewrite groups", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), e});
            ctx.output(TaskResultAggregator.ERROR_STREAM, (Object)e);
            this.errorCounter.inc();
        }
    }

    public void close() throws Exception {
        super.close();
        this.tableLoader.close();
    }

    public static class PlannedGroup {
        private final SerializableTable table;
        private final int groupsPerCommit;
        private final RewriteFileGroup group;

        private PlannedGroup(SerializableTable table, int groupsPerCommit, RewriteFileGroup group) {
            this.table = table;
            this.groupsPerCommit = groupsPerCommit;
            this.group = group;
        }

        SerializableTable table() {
            return this.table;
        }

        int groupsPerCommit() {
            return this.groupsPerCommit;
        }

        RewriteFileGroup group() {
            return this.group;
        }
    }
}

