/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DataFileRewriteRunner
extends ProcessFunction<DataFileRewritePlanner.PlannedGroup, ExecutedGroup> {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileRewriteRunner.class);
    private final String tableName;
    private final String taskName;
    private final int taskIndex;
    private transient int subTaskId;
    private transient int attemptId;
    private transient Counter errorCounter;

    public DataFileRewriteRunner(String tableName, String taskName, int taskIndex) {
        Preconditions.checkNotNull((Object)tableName, (Object)"Table name should no be null");
        Preconditions.checkNotNull((Object)taskName, (Object)"Task name should no be null");
        this.tableName = tableName;
        this.taskName = taskName;
        this.taskIndex = taskIndex;
    }

    public void open(Configuration parameters) {
        this.errorCounter = TableMaintenanceMetrics.groupFor(this.getRuntimeContext(), this.tableName, this.taskName, this.taskIndex).counter("error");
        this.subTaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.attemptId = this.getRuntimeContext().getTaskInfo().getAttemptNumber();
    }

    public void processElement(DataFileRewritePlanner.PlannedGroup value, ProcessFunction.Context ctx, Collector<ExecutedGroup> out) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("[For table {} with {}[{}] at {}]: Rewriting files for group {} with files: {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group().info(), value.group().rewrittenFiles()});
        } else {
            LOG.info("[For table {} with {}[{}] at {}]: Rewriting files for group {} with {} number of files", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group().info(), value.group().rewrittenFiles().size()});
        }
        try (TaskWriter<RowData> writer = this.writerFor(value);){
            try (DataIterator<RowData> iterator = this.readerFor(value);){
                while (iterator.hasNext()) {
                    writer.write((Object)iterator.next());
                }
                HashSet dataFiles = Sets.newHashSet((Object[])writer.dataFiles());
                value.group().setOutputFiles((Set)dataFiles);
                out.collect((Object)new ExecutedGroup(value.table().currentSnapshot().snapshotId(), value.groupsPerCommit(), value.group()));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[For table {} with {}[{}] at {}]: Rewritten files {} from {} to {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group().info(), value.group().rewrittenFiles(), value.group().addedFiles()});
                } else {
                    LOG.info("[For table {} with {}[{}] at {}]: Rewritten {} files to {} files", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group().rewrittenFiles().size(), value.group().addedFiles().size()});
                }
            }
            catch (Exception ex) {
                LOG.info("[For table {} with {}[{}] at {}]: Exception rewriting datafile group {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group(), ex});
                ctx.output(TaskResultAggregator.ERROR_STREAM, (Object)ex);
                this.errorCounter.inc();
                this.abort(writer, ctx.timestamp());
            }
        }
        catch (Exception ex) {
            LOG.info("[For table {} with {}[{}] at {}]: Exception creating compaction writer for group {}", new Object[]{this.tableName, this.taskName, this.taskIndex, ctx.timestamp(), value.group(), ex});
            ctx.output(TaskResultAggregator.ERROR_STREAM, (Object)ex);
            this.errorCounter.inc();
        }
    }

    private TaskWriter<RowData> writerFor(DataFileRewritePlanner.PlannedGroup value) {
        String formatString = PropertyUtil.propertyAsString((Map)value.table().properties(), (String)"write.format.default", (String)"parquet");
        RowDataTaskWriterFactory factory = new RowDataTaskWriterFactory((Table)value.table(), FlinkSchemaUtil.convert(value.table().schema()), value.group().inputSplitSize(), FileFormat.fromString((String)formatString), (Map<String, String>)value.table().properties(), null, false);
        factory.initialize(this.subTaskId, this.attemptId);
        return factory.create();
    }

    private DataIterator<RowData> readerFor(DataFileRewritePlanner.PlannedGroup value) {
        RowDataFileScanTaskReader reader = new RowDataFileScanTaskReader(value.table().schema(), value.table().schema(), PropertyUtil.propertyAsString((Map)value.table().properties(), (String)"schema.name-mapping.default", null), false, Collections.emptyList());
        return new DataIterator<RowData>(reader, (CombinedScanTask)new BaseCombinedScanTask(value.group().fileScanTasks()), value.table().io(), value.table().encryption());
    }

    private void abort(TaskWriter<RowData> writer, long timestamp) {
        try {
            LOG.info("[For table {} with {}[{}] at {}]: Aborting rewrite for (subTaskId {}, attemptId {})", new Object[]{this.tableName, this.taskName, this.taskIndex, timestamp, this.subTaskId, this.attemptId});
            writer.abort();
        }
        catch (Exception e) {
            LOG.info("[For table {} with {}[{}] at {}]: Exception in abort", new Object[]{this.tableName, this.taskName, this.taskIndex, timestamp, e});
        }
    }

    public static class ExecutedGroup {
        private final long snapshotId;
        private final int groupsPerCommit;
        private final RewriteFileGroup group;

        @VisibleForTesting
        ExecutedGroup(long snapshotId, int groupsPerCommit, RewriteFileGroup group) {
            this.snapshotId = snapshotId;
            this.groupsPerCommit = groupsPerCommit;
            this.group = group;
        }

        long snapshotId() {
            return this.snapshotId;
        }

        int groupsPerCommit() {
            return this.groupsPerCommit;
        }

        RewriteFileGroup group() {
            return this.group;
        }
    }
}

