/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.sink.internal.writer;

import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.writer.DeltaWriterBucketState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaInProgressPart;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaPendingFile;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaWriterBucket<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaWriterBucket.class);
    public static final String RECORDS_WRITTEN_METRIC_NAME = "DeltaSinkRecordsWritten";
    public static final String BYTES_WRITTEN_METRIC_NAME = "DeltaSinkBytesWritten";
    private final String bucketId;
    private final Path bucketPath;
    private final OutputFileConfig outputFileConfig;
    private final String uniqueId;
    private final DeltaBulkBucketWriter<IN, String> bucketWriter;
    private final CheckpointRollingPolicy<IN, String> rollingPolicy;
    private final List<DeltaPendingFile> pendingFiles = new ArrayList<DeltaPendingFile>();
    private final LinkedHashMap<String, String> partitionSpec;
    private long partCounter;
    private long inProgressPartRecordCount;
    @Nullable
    private DeltaInProgressPart<IN> deltaInProgressPart;
    private final Counter recordsWrittenCounter;
    private final Counter bytesWrittenCounter;

    private DeltaWriterBucket(String string, Path path, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
        this.bucketId = (String)Preconditions.checkNotNull((Object)string);
        this.bucketPath = (Path)Preconditions.checkNotNull((Object)path);
        this.bucketWriter = (DeltaBulkBucketWriter)((Object)Preconditions.checkNotNull(deltaBulkBucketWriter));
        this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(checkpointRollingPolicy);
        this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        this.partitionSpec = PartitionPathUtils.extractPartitionSpecFromPath((Path)this.bucketPath);
        this.uniqueId = UUID.randomUUID().toString();
        this.partCounter = 0L;
        this.inProgressPartRecordCount = 0L;
        this.recordsWrittenCounter = metricGroup.counter(RECORDS_WRITTEN_METRIC_NAME);
        this.bytesWrittenCounter = metricGroup.counter(BYTES_WRITTEN_METRIC_NAME);
    }

    private DeltaWriterBucket(DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, DeltaWriterBucketState deltaWriterBucketState, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
        this(deltaWriterBucketState.getBucketId(), deltaWriterBucketState.getBucketPath(), deltaBulkBucketWriter, checkpointRollingPolicy, outputFileConfig, metricGroup);
    }

    List<DeltaCommittable> prepareCommit(boolean bl, String string, long l) throws IOException {
        if (this.deltaInProgressPart != null) {
            if (this.rollingPolicy.shouldRollOnCheckpoint(this.deltaInProgressPart.getBulkPartWriter()) || bl) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing in-progress part file for bucket id={} on checkpoint.", (Object)this.bucketId);
                }
                this.closePartFile();
            } else {
                throw new RuntimeException("Unexpected behaviour. Delta writers should always roll part files on checkpoint. To resolve this issue verify behaviour of your rolling policy.");
            }
        }
        ArrayList<DeltaCommittable> arrayList = new ArrayList<DeltaCommittable>();
        this.pendingFiles.forEach(deltaPendingFile -> arrayList.add(new DeltaCommittable((DeltaPendingFile)deltaPendingFile, string, l)));
        this.pendingFiles.clear();
        return arrayList;
    }

    DeltaWriterBucketState snapshotState(String string) {
        return new DeltaWriterBucketState(this.bucketId, this.bucketPath, string);
    }

    private DeltaInProgressPart<IN> rollPartFile(long l) throws IOException {
        this.closePartFile();
        Path path = this.assembleNewPartPath();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening new part file \"{}\" for bucket id={}.", (Object)path.getName(), (Object)this.bucketId);
        }
        DeltaBulkPartWriter deltaBulkPartWriter = (DeltaBulkPartWriter)this.bucketWriter.openNewInProgressFile(this.bucketId, path, l);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Successfully opened new part file \"{}\" for bucket id={}.", (Object)path.getName(), (Object)this.bucketId);
        }
        return new DeltaInProgressPart(path.getName(), deltaBulkPartWriter);
    }

    private void closePartFile() throws IOException {
        if (this.deltaInProgressPart != null) {
            this.deltaInProgressPart.getBulkPartWriter().closeWriter();
            long l = this.deltaInProgressPart.getBulkPartWriter().getSize();
            InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = this.deltaInProgressPart.getBulkPartWriter().closeForCommit();
            DeltaPendingFile deltaPendingFile = new DeltaPendingFile(this.partitionSpec, this.deltaInProgressPart.getFileName(), pendingFileRecoverable, this.inProgressPartRecordCount, l, this.deltaInProgressPart.getBulkPartWriter().getLastUpdateTime());
            this.pendingFiles.add(deltaPendingFile);
            this.deltaInProgressPart = null;
            this.inProgressPartRecordCount = 0L;
            this.recordsWrittenCounter.inc(deltaPendingFile.getRecordCount());
            this.bytesWrittenCounter.inc(l);
        }
    }

    void write(IN IN, long l) throws IOException {
        if (this.deltaInProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.deltaInProgressPart.getBulkPartWriter(), IN)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Opening new part file for bucket id={} due to element {}.", (Object)this.bucketId, IN);
            }
            this.deltaInProgressPart = this.rollPartFile(l);
        }
        this.deltaInProgressPart.getBulkPartWriter().write(IN, l);
        ++this.inProgressPartRecordCount;
    }

    void merge(DeltaWriterBucket<IN> deltaWriterBucket) throws IOException {
        Preconditions.checkNotNull(deltaWriterBucket);
        Preconditions.checkState((boolean)Objects.equals(deltaWriterBucket.bucketPath, this.bucketPath));
        super.closePartFile();
        this.pendingFiles.addAll(deltaWriterBucket.pendingFiles);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Merging buckets for bucket id={}", (Object)this.bucketId);
        }
    }

    public boolean isActive() {
        return this.deltaInProgressPart != null || this.pendingFiles.size() > 0;
    }

    void onProcessingTime(long l) throws IOException {
        if (this.deltaInProgressPart != null && this.rollingPolicy.shouldRollOnProcessingTime(this.deltaInProgressPart.getBulkPartWriter(), l)) {
            DeltaBulkPartWriter<IN, String> deltaBulkPartWriter = this.deltaInProgressPart.getBulkPartWriter();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Bucket {} closing in-progress part file for part file id={} due to processing time rolling policy (in-progress file created @ {}, last updated @ {} and current time is {}).", new Object[]{this.bucketId, this.uniqueId, deltaBulkPartWriter.getCreationTime(), deltaBulkPartWriter.getLastUpdateTime(), l});
            }
            this.closePartFile();
        }
    }

    private Path assembleNewPartPath() {
        long l = this.partCounter++;
        return new Path(this.bucketPath, this.outputFileConfig.getPartPrefix() + '-' + this.uniqueId + '-' + l + this.outputFileConfig.getPartSuffix());
    }

    void disposePartFile() {
        if (this.deltaInProgressPart != null) {
            this.deltaInProgressPart.getBulkPartWriter().dispose();
        }
    }

    public static class DeltaWriterBucketFactory {
        static <IN> DeltaWriterBucket<IN> getNewBucket(String string, Path path, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
            return new DeltaWriterBucket(string, path, deltaBulkBucketWriter, checkpointRollingPolicy, outputFileConfig, metricGroup);
        }

        static <IN> DeltaWriterBucket<IN> restoreBucket(DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, DeltaWriterBucketState deltaWriterBucketState, OutputFileConfig outputFileConfig, MetricGroup metricGroup) {
            return new DeltaWriterBucket(deltaBulkBucketWriter, checkpointRollingPolicy, deltaWriterBucketState, outputFileConfig, metricGroup);
        }
    }
}

