/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
BoundedOneInput {
    private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
    private static final long serialVersionUID = 1L;
    private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
    protected final FileStoreTable table;

    public BatchWriteGeneratorTagOperator(CommitterOperator<CommitT, GlobalCommitT> commitOperator, FileStoreTable table) {
        this.table = table;
        this.commitOperator = commitOperator;
    }

    public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        this.commitOperator.initializeState(streamTaskStateManager);
    }

    public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
        return this.commitOperator.snapshotState(checkpointId, timestamp, checkpointOptions, storageLocation);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.commitOperator.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.commitOperator.notifyCheckpointAborted(checkpointId);
    }

    private void createTag() {
        block4: {
            SnapshotManager snapshotManager = this.table.snapshotManager();
            Snapshot snapshot = snapshotManager.latestSnapshot();
            if (snapshot == null) {
                return;
            }
            TagManager tagManager = this.table.tagManager();
            TagDeletion tagDeletion = this.table.store().newTagDeletion();
            Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String tagName = BATCH_WRITE_TAG_PREFIX + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            try {
                if (tagManager.tagExists(tagName)) {
                    tagManager.deleteTag(tagName, tagDeletion, snapshotManager, this.table.store().createTagCallbacks());
                }
                tagManager.createTag(snapshot, tagName, this.table.coreOptions().tagDefaultTimeRetained(), this.table.store().createTagCallbacks());
                this.expireTag();
            }
            catch (Exception e) {
                if (!tagManager.tagExists(tagName)) break block4;
                tagManager.deleteTag(tagName, tagDeletion, snapshotManager, this.table.store().createTagCallbacks());
            }
        }
    }

    private void expireTag() {
        Integer tagNumRetainedMax = this.table.coreOptions().tagNumRetainedMax();
        if (tagNumRetainedMax != null) {
            SnapshotManager snapshotManager = this.table.snapshotManager();
            if (snapshotManager.latestSnapshot() == null) {
                return;
            }
            TagManager tagManager = this.table.tagManager();
            TagDeletion tagDeletion = this.table.store().newTagDeletion();
            long tagCount = tagManager.tagCount();
            block0: while (tagCount > (long)tagNumRetainedMax.intValue()) {
                for (List<String> tagNames : tagManager.tags().values()) {
                    if (tagCount - (long)tagNames.size() >= (long)tagNumRetainedMax.intValue()) {
                        tagManager.deleteAllTagsOfOneSnapshot(tagNames, tagDeletion, snapshotManager);
                        tagCount -= (long)tagNames.size();
                        continue;
                    }
                    List<String> sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames);
                    for (String toBeDeleted : sortedTagNames) {
                        tagManager.deleteTag(toBeDeleted, tagDeletion, snapshotManager, this.table.store().createTagCallbacks());
                        if (--tagCount != (long)tagNumRetainedMax.intValue()) continue;
                        continue block0;
                    }
                    continue block0;
                }
            }
        }
    }

    public void open() throws Exception {
        this.commitOperator.open();
    }

    public void processElement(StreamRecord<CommitT> element) throws Exception {
        this.commitOperator.processElement(element);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.commitOperator.processWatermark(watermark);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.commitOperator.processWatermarkStatus(watermarkStatus);
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.commitOperator.processLatencyMarker(latencyMarker);
    }

    public void finish() throws Exception {
        this.createTag();
        this.commitOperator.finish();
    }

    public void close() throws Exception {
        this.commitOperator.close();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.commitOperator.prepareSnapshotPreBarrier(checkpointId);
    }

    public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
        this.commitOperator.setKeyContextElement1(record);
    }

    public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
        this.commitOperator.setKeyContextElement2(record);
    }

    public OperatorMetricGroup getMetricGroup() {
        return this.commitOperator.getMetricGroup();
    }

    public OperatorID getOperatorID() {
        return this.commitOperator.getOperatorID();
    }

    public void setCurrentKey(Object key) {
        this.commitOperator.setCurrentKey(key);
    }

    public Object getCurrentKey() {
        return this.commitOperator.getCurrentKey();
    }

    public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception {
        this.commitOperator.setKeyContextElement(record);
    }

    public void endInput() throws Exception {
        this.commitOperator.endInput();
    }
}

