/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.spark.source.Writer;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingWriter
extends Writer
implements StreamWriter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingWriter.class);
    private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
    private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
    private final String queryId;
    private final OutputMode mode;

    StreamingWriter(Table table, DataSourceOptions options, String queryId, OutputMode mode, String applicationId) {
        super(table, options, false, applicationId);
        this.queryId = queryId;
        this.mode = mode;
    }

    public void commit(long epochId, WriterCommitMessage[] messages) {
        LOG.info("Committing epoch {} for query {} in {} mode", new Object[]{epochId, this.queryId, this.mode});
        this.table().refresh();
        Long lastCommittedEpochId = this.getLastCommittedEpochId();
        if (lastCommittedEpochId != null && epochId <= lastCommittedEpochId) {
            LOG.info("Skipping epoch {} for query {} as it was already committed", (Object)epochId, (Object)this.queryId);
            return;
        }
        if (this.mode == OutputMode.Complete()) {
            OverwriteFiles overwriteFiles = this.table().newOverwrite();
            overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
            int numFiles = 0;
            for (DataFile file : this.files(messages)) {
                overwriteFiles.addFile(file);
                ++numFiles;
            }
            this.commit(overwriteFiles, epochId, numFiles, "streaming complete overwrite");
        } else {
            AppendFiles append = this.table().newFastAppend();
            int numFiles = 0;
            for (DataFile file : this.files(messages)) {
                append.appendFile(file);
                ++numFiles;
            }
            this.commit(append, epochId, numFiles, "streaming append");
        }
    }

    private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, int numFiles, String description) {
        snapshotUpdate.set(QUERY_ID_PROPERTY, this.queryId);
        snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
        this.commitOperation(snapshotUpdate, numFiles, description);
    }

    public void abort(long epochId, WriterCommitMessage[] messages) {
        this.abort(messages);
    }

    private Long getLastCommittedEpochId() {
        Snapshot snapshot = this.table().currentSnapshot();
        Long lastCommittedEpochId = null;
        while (snapshot != null) {
            Map<String, String> summary = snapshot.summary();
            String snapshotQueryId = summary.get(QUERY_ID_PROPERTY);
            if (this.queryId.equals(snapshotQueryId)) {
                lastCommittedEpochId = Long.valueOf(summary.get(EPOCH_ID_PROPERTY));
                break;
            }
            Long parentSnapshotId = snapshot.parentId();
            snapshot = parentSnapshotId != null ? this.table().snapshot(parentSnapshotId) : null;
        }
        return lastCommittedEpochId;
    }
}

