/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.ManifestOutputFileFactory;
import org.apache.iceberg.flink.sink.dynamic.DynamicCommittable;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriteResult;
import org.apache.iceberg.flink.sink.dynamic.WriteTarget;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Cache;
import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DynamicWriteResultAggregator
extends AbstractStreamOperator<CommittableMessage<DynamicCommittable>>
implements OneInputStreamOperator<CommittableMessage<DynamicWriteResult>, CommittableMessage<DynamicCommittable>> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class);
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private static final Duration CACHE_EXPIRATION_DURATION = Duration.ofMinutes(1L);
    private final CatalogLoader catalogLoader;
    private transient Map<WriteTarget, Collection<DynamicWriteResult>> results;
    private transient Cache<String, Map<Integer, PartitionSpec>> specs;
    private transient Cache<String, ManifestOutputFileFactory> outputFileFactories;
    private transient String flinkJobId;
    private transient String operatorId;
    private transient int subTaskId;
    private transient int attemptId;
    private transient Catalog catalog;

    DynamicWriteResultAggregator(CatalogLoader catalogLoader) {
        this.catalogLoader = catalogLoader;
    }

    public void open() throws Exception {
        this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        this.operatorId = this.getOperatorID().toString();
        this.subTaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.attemptId = this.getRuntimeContext().getTaskInfo().getAttemptNumber();
        this.results = Maps.newHashMap();
        this.specs = Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
        this.outputFileFactories = Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
        this.catalog = this.catalogLoader.loadCatalog();
    }

    public void finish() throws IOException {
        this.prepareSnapshotPreBarrier(Long.MAX_VALUE);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
        HashSet<CommittableWithLineage> committables = Sets.newHashSetWithExpectedSize(this.results.size());
        int count = 0;
        for (Map.Entry<WriteTarget, Collection<DynamicWriteResult>> entries : this.results.entrySet()) {
            committables.add(new CommittableWithLineage((Object)new DynamicCommittable(entries.getKey(), this.writeToManifest(entries.getKey(), entries.getValue(), checkpointId), this.getContainingTask().getEnvironment().getJobID().toString(), this.getRuntimeContext().getOperatorUniqueID(), checkpointId), checkpointId, count));
            ++count;
        }
        this.output.collect((Object)new StreamRecord((Object)new CommittableSummary(this.subTaskId, count, checkpointId, count, count, 0)));
        committables.forEach(c -> this.output.collect((Object)new StreamRecord((Object)new CommittableWithLineage((Object)((DynamicCommittable)c.getCommittable()), checkpointId, this.subTaskId))));
        LOG.info("Emitted {} commit message to downstream committer operator", (Object)count);
        this.results.clear();
    }

    @VisibleForTesting
    byte[] writeToManifest(WriteTarget key, Collection<DynamicWriteResult> writeResults, long checkpointId) throws IOException {
        if (writeResults.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        }
        WriteResult.Builder builder = WriteResult.builder();
        writeResults.forEach(w -> builder.add(w.writeResult()));
        WriteResult result = builder.build();
        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> this.outputFileFactory(key.tableName()).create(checkpointId), this.spec(key.tableName(), key.specId()));
        return SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (Object)deltaManifests);
    }

    public void processElement(StreamRecord<CommittableMessage<DynamicWriteResult>> element) throws Exception {
        if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) {
            DynamicWriteResult result = (DynamicWriteResult)((CommittableWithLineage)element.getValue()).getCommittable();
            WriteTarget key = result.key();
            this.results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result);
        }
    }

    private ManifestOutputFileFactory outputFileFactory(String tableName) {
        return this.outputFileFactories.get(tableName, unused -> {
            Table table = this.catalog.loadTable(TableIdentifier.parse(tableName));
            this.specs.put(tableName, table.specs());
            return FlinkManifestUtil.createOutputFileFactory(() -> table, table.properties(), this.flinkJobId, this.operatorId, this.subTaskId, this.attemptId);
        });
    }

    private PartitionSpec spec(String tableName, int specId) {
        PartitionSpec spec;
        Map<Integer, PartitionSpec> knownSpecs = this.specs.getIfPresent(tableName);
        if (knownSpecs != null && (spec = knownSpecs.get(specId)) != null) {
            return spec;
        }
        Table table = this.catalog.loadTable(TableIdentifier.parse(tableName));
        return table.specs().get(specId);
    }
}

