/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.sink.internal.writer;

import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.writer.DeltaWriterBucket;
import io.delta.flink.sink.internal.writer.DeltaWriterBucketState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaWriter<IN>
implements SinkWriter<IN, DeltaCommittable, DeltaWriterBucketState>,
Sink.ProcessingTimeService.ProcessingTimeCallback {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaWriter.class);
    public static final String RECORDS_OUT_METRIC_NAME = "DeltaSinkRecordsOut";
    public static final String NOOP_WRITER_STATE = "<noop-writer-state>";
    private final String appId;
    private long nextCheckpointId;
    private final DeltaBulkBucketWriter<IN, String> bucketWriter;
    private final CheckpointRollingPolicy<IN, String> rollingPolicy;
    private final Path basePath;
    private final BucketAssigner<IN, String> bucketAssigner;
    private final Sink.ProcessingTimeService processingTimeService;
    private final long bucketCheckInterval;
    private final Map<String, DeltaWriterBucket<IN>> activeBuckets;
    private final BucketerContext bucketerContext;
    private final OutputFileConfig outputFileConfig;
    private final MetricGroup metricGroup;
    private final Counter recordsOutCounter;

    public DeltaWriter(Path path, BucketAssigner<IN, String> bucketAssigner, DeltaBulkBucketWriter<IN, String> deltaBulkBucketWriter, CheckpointRollingPolicy<IN, String> checkpointRollingPolicy, OutputFileConfig outputFileConfig, Sink.ProcessingTimeService processingTimeService, MetricGroup metricGroup, long l, String string, long l2) {
        this.basePath = (Path)Preconditions.checkNotNull((Object)path);
        this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(bucketAssigner);
        this.bucketWriter = (DeltaBulkBucketWriter)((Object)Preconditions.checkNotNull(deltaBulkBucketWriter));
        this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(checkpointRollingPolicy);
        this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        this.activeBuckets = new HashMap<String, DeltaWriterBucket<IN>>();
        this.bucketerContext = new BucketerContext();
        this.processingTimeService = (Sink.ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.metricGroup = metricGroup;
        this.recordsOutCounter = metricGroup.counter(RECORDS_OUT_METRIC_NAME);
        Preconditions.checkArgument((l > 0L ? 1 : 0) != 0, (Object)"Bucket checking interval for processing time should be positive.");
        this.bucketCheckInterval = l;
        this.appId = string;
        this.nextCheckpointId = l2;
    }

    public List<DeltaWriterBucketState> snapshotState() {
        Preconditions.checkState((this.bucketWriter != null ? 1 : 0) != 0, (Object)"sink has not been initialized");
        ArrayList<DeltaWriterBucketState> arrayList = new ArrayList<DeltaWriterBucketState>();
        for (DeltaWriterBucket<IN> deltaWriterBucket : this.activeBuckets.values()) {
            arrayList.add(deltaWriterBucket.snapshotState(this.appId));
        }
        if (arrayList.isEmpty()) {
            arrayList.add(new DeltaWriterBucketState(NOOP_WRITER_STATE, this.basePath, this.appId));
        }
        return arrayList;
    }

    private void incrementNextCheckpointId() {
        ++this.nextCheckpointId;
    }

    long getNextCheckpointId() {
        return this.nextCheckpointId;
    }

    public void write(IN IN, SinkWriter.Context context) throws IOException {
        this.bucketerContext.update(context.timestamp(), context.currentWatermark(), this.processingTimeService.getCurrentProcessingTime());
        String string = (String)this.bucketAssigner.getBucketId(IN, (BucketAssigner.Context)this.bucketerContext);
        DeltaWriterBucket<IN> deltaWriterBucket = this.getOrCreateBucketForBucketId(string);
        deltaWriterBucket.write(IN, this.processingTimeService.getCurrentProcessingTime());
        this.recordsOutCounter.inc();
    }

    public List<DeltaCommittable> prepareCommit(boolean bl) throws IOException {
        ArrayList<DeltaCommittable> arrayList = new ArrayList<DeltaCommittable>();
        Iterator<Map.Entry<String, DeltaWriterBucket<IN>>> iterator = this.activeBuckets.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, DeltaWriterBucket<IN>> entry = iterator.next();
            if (!entry.getValue().isActive()) {
                iterator.remove();
                continue;
            }
            arrayList.addAll(entry.getValue().prepareCommit(bl, this.appId, this.nextCheckpointId));
        }
        this.incrementNextCheckpointId();
        return arrayList;
    }

    public void initializeState(List<DeltaWriterBucketState> list) throws IOException {
        Preconditions.checkNotNull(list, (String)"The retrieved state was null.");
        for (DeltaWriterBucketState deltaWriterBucketState : list) {
            String string = deltaWriterBucketState.getBucketId();
            if (string.equals(NOOP_WRITER_STATE)) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Restoring: {}", (Object)deltaWriterBucketState);
            }
            DeltaWriterBucket<IN> deltaWriterBucket = DeltaWriterBucket.DeltaWriterBucketFactory.restoreBucket(this.bucketWriter, this.rollingPolicy, deltaWriterBucketState, this.outputFileConfig, this.metricGroup);
            this.updateActiveBucketId(string, deltaWriterBucket);
        }
        this.registerNextBucketInspectionTimer();
    }

    private void updateActiveBucketId(String string, DeltaWriterBucket<IN> deltaWriterBucket) throws IOException {
        DeltaWriterBucket<IN> deltaWriterBucket2 = this.activeBuckets.get(string);
        if (deltaWriterBucket2 != null) {
            deltaWriterBucket2.merge(deltaWriterBucket);
        } else {
            this.activeBuckets.put(string, deltaWriterBucket);
        }
    }

    private DeltaWriterBucket<IN> getOrCreateBucketForBucketId(String string) {
        DeltaWriterBucket<IN> deltaWriterBucket = this.activeBuckets.get(string);
        if (deltaWriterBucket == null) {
            Path path = this.assembleBucketPath(string);
            deltaWriterBucket = DeltaWriterBucket.DeltaWriterBucketFactory.getNewBucket(string, path, this.bucketWriter, this.rollingPolicy, this.outputFileConfig, this.metricGroup);
            this.activeBuckets.put(string, deltaWriterBucket);
        }
        return deltaWriterBucket;
    }

    public void close() {
        if (this.activeBuckets != null) {
            this.activeBuckets.values().forEach(DeltaWriterBucket::disposePartFile);
        }
    }

    private Path assembleBucketPath(String string) {
        if ("".equals(string)) {
            return this.basePath;
        }
        return new Path(this.basePath, string);
    }

    public void onProcessingTime(long l) throws IOException {
        for (DeltaWriterBucket<IN> deltaWriterBucket : this.activeBuckets.values()) {
            deltaWriterBucket.onProcessingTime(l);
        }
        this.registerNextBucketInspectionTimer();
    }

    private void registerNextBucketInspectionTimer() {
        long l = this.processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval;
        this.processingTimeService.registerProcessingTimer(l, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
    }

    @VisibleForTesting
    Map<String, DeltaWriterBucket<IN>> getActiveBuckets() {
        return this.activeBuckets;
    }

    private static final class BucketerContext
    implements BucketAssigner.Context {
        @Nullable
        private Long elementTimestamp = null;
        private long currentWatermark = Long.MIN_VALUE;
        private long currentProcessingTime = Long.MIN_VALUE;

        private BucketerContext() {
        }

        void update(@Nullable Long l, long l2, long l3) {
            this.elementTimestamp = l;
            this.currentWatermark = l2;
            this.currentProcessingTime = l3;
        }

        public long currentProcessingTime() {
            return this.currentProcessingTime;
        }

        public long currentWatermark() {
            return this.currentWatermark;
        }

        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }
}

