/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.Collection;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.IcebergCommittable;
import org.apache.iceberg.flink.sink.ManifestOutputFileFactory;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergWriteAggregator
extends AbstractStreamOperator<CommittableMessage<IcebergCommittable>>
implements OneInputStreamOperator<CommittableMessage<WriteResult>, CommittableMessage<IcebergCommittable>> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergWriteAggregator.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private final Collection<WriteResult> results = Sets.newHashSet();
    private transient ManifestOutputFileFactory icebergManifestOutputFileFactory;
    private transient Table table;
    private final TableLoader tableLoader;

    IcebergWriteAggregator(TableLoader tableLoader) {
        this.tableLoader = tableLoader;
    }

    public void open() throws Exception {
        if (!this.tableLoader.isOpen()) {
            this.tableLoader.open();
        }
        String flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        String operatorId = this.getOperatorID().toString();
        int subTaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        Preconditions.checkArgument((subTaskId == 0 ? 1 : 0) != 0, (Object)"The subTaskId must be zero in the IcebergWriteAggregator");
        int attemptId = this.getRuntimeContext().getTaskInfo().getAttemptNumber();
        this.table = this.tableLoader.loadTable();
        this.icebergManifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(() -> this.table, this.table.properties(), flinkJobId, operatorId, subTaskId, attemptId);
    }

    public void finish() throws IOException {
        this.prepareSnapshotPreBarrier(Long.MAX_VALUE);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
        IcebergCommittable committable = new IcebergCommittable(this.writeToManifest(this.results, checkpointId), this.getContainingTask().getEnvironment().getJobID().toString(), this.getRuntimeContext().getOperatorUniqueID(), checkpointId);
        CommittableSummary summary = new CommittableSummary(0, 1, Long.valueOf(checkpointId), 1, 1, 0);
        this.output.collect((Object)new StreamRecord((Object)summary));
        CommittableWithLineage message = new CommittableWithLineage((Object)committable, Long.valueOf(checkpointId), 0);
        this.output.collect((Object)new StreamRecord((Object)message));
        LOG.info("Emitted commit message to downstream committer operator");
        this.results.clear();
    }

    public byte[] writeToManifest(Collection<WriteResult> writeResults, long checkpointId) throws IOException {
        if (writeResults.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        }
        WriteResult result = WriteResult.builder().addAll(writeResults).build();
        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> this.icebergManifestOutputFileFactory.create(checkpointId), this.table.spec());
        return SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (Object)deltaManifests);
    }

    public void processElement(StreamRecord<CommittableMessage<WriteResult>> element) throws Exception {
        if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) {
            this.results.add((WriteResult)((CommittableWithLineage)element.getValue()).getCommittable());
        }
    }
}

