/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.listener;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.listener.CommitListener;
import org.apache.paimon.flink.sink.listener.PartitionMarkDoneTrigger;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionMarkDoneListener
implements CommitListener {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionMarkDoneListener.class);
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionMarkDoneTrigger trigger;
    private final List<PartitionMarkDoneAction> actions;
    private final boolean waitCompaction;
    private final FlinkConnectorOptions.PartitionMarkDoneActionMode partitionMarkDoneActionMode;

    public static Optional<PartitionMarkDoneListener> create(ClassLoader cl, boolean isStreaming, boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception {
        CoreOptions coreOptions = table.coreOptions();
        Options options = coreOptions.toConfiguration();
        if (PartitionMarkDoneListener.disablePartitionMarkDone(isStreaming, table, options)) {
            return Optional.empty();
        }
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(coreOptions.partitionDefaultName(), table.schema().logicalPartitionType(), table.partitionKeys().toArray(new String[0]), coreOptions.legacyPartitionName());
        PartitionMarkDoneTrigger trigger = PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);
        List<PartitionMarkDoneAction> actions = PartitionMarkDoneAction.createActions(cl, table, coreOptions);
        boolean waitCompaction = !table.primaryKeys().isEmpty() && (coreOptions.deletionVectorsEnabled() || coreOptions.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW);
        return Optional.of(new PartitionMarkDoneListener(partitionComputer, trigger, actions, waitCompaction, options.get(FlinkConnectorOptions.PARTITION_MARK_DONE_MODE)));
    }

    private static boolean disablePartitionMarkDone(boolean isStreaming, FileStoreTable table, Options options) {
        boolean partitionMarkDoneWhenEndInput = options.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT);
        if (!isStreaming && !partitionMarkDoneWhenEndInput) {
            return true;
        }
        Duration idleToDone = options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE);
        if (isStreaming && idleToDone == null) {
            return true;
        }
        return table.partitionKeys().isEmpty();
    }

    public PartitionMarkDoneListener(InternalRowPartitionComputer partitionComputer, PartitionMarkDoneTrigger trigger, List<PartitionMarkDoneAction> actions, boolean waitCompaction, FlinkConnectorOptions.PartitionMarkDoneActionMode partitionMarkDoneActionMode) {
        this.partitionComputer = partitionComputer;
        this.trigger = trigger;
        this.actions = actions;
        this.waitCompaction = waitCompaction;
        this.partitionMarkDoneActionMode = partitionMarkDoneActionMode;
    }

    @Override
    public void notifyCommittable(List<ManifestCommittable> committables) {
        if (this.partitionMarkDoneActionMode == FlinkConnectorOptions.PartitionMarkDoneActionMode.WATERMARK) {
            this.markDoneByWatermark(committables);
        } else {
            this.markDoneByProcessTime(committables);
        }
    }

    private void markDoneByProcessTime(List<ManifestCommittable> committables) {
        HashSet<BinaryRow> partitions = new HashSet<BinaryRow>();
        boolean endInput = false;
        for (ManifestCommittable committable : committables) {
            for (CommitMessage commitMessage : committable.fileCommittables()) {
                CommitMessageImpl message = (CommitMessageImpl)commitMessage;
                if (!this.waitCompaction && message.indexIncrement().isEmpty() && message.newFilesIncrement().isEmpty()) continue;
                partitions.add(message.partition());
            }
            if (committable.identifier() != Long.MAX_VALUE) continue;
            endInput = true;
        }
        partitions.stream().map(this.partitionComputer::generatePartValues).map(PartitionPathUtils::generatePartitionPath).forEach(this.trigger::notifyPartition);
        PartitionMarkDoneListener.markDone(this.trigger.donePartitions(endInput), this.actions);
    }

    private void markDoneByWatermark(List<ManifestCommittable> committables) {
        Tuple2<Map<BinaryRow, Long>, Boolean> extractedWatermarks = this.extractPartitionWatermarks(committables);
        Map partitionWatermarks = (Map)extractedWatermarks.f0;
        boolean endInput = (Boolean)extractedWatermarks.f1;
        Optional latestWatermark = partitionWatermarks.values().stream().max(Long::compareTo);
        if (!latestWatermark.isPresent()) {
            LOG.warn("No watermark found in this batch of committables, skip partition mark done.");
            return;
        }
        partitionWatermarks.forEach((row, value) -> {
            String partition = PartitionPathUtils.generatePartitionPath(this.partitionComputer.generatePartValues((InternalRow)row));
            this.trigger.notifyPartition(partition, (long)value);
        });
        PartitionMarkDoneListener.markDone(this.trigger.donePartitions(endInput, (Long)latestWatermark.get(), true), this.actions);
    }

    private Tuple2<Map<BinaryRow, Long>, Boolean> extractPartitionWatermarks(List<ManifestCommittable> committables) {
        boolean endInput = false;
        HashMap<BinaryRow, Long> partitionWatermarks = new HashMap<BinaryRow, Long>();
        for (ManifestCommittable committable : committables) {
            Long watermark = committable.watermark();
            if (watermark != null) {
                for (CommitMessage commitMessage : committable.fileCommittables()) {
                    CommitMessageImpl message = (CommitMessageImpl)commitMessage;
                    if (!this.waitCompaction && message.indexIncrement().isEmpty() && message.newFilesIncrement().isEmpty()) continue;
                    partitionWatermarks.compute(message.partition(), (partition, old) -> old == null ? watermark : Math.max(old, watermark));
                }
            }
            if (committable.identifier() != Long.MAX_VALUE) continue;
            endInput = true;
        }
        return Tuple2.of(partitionWatermarks, (Object)endInput);
    }

    public static void markDone(List<String> partitions, List<PartitionMarkDoneAction> actions) {
        for (String partition : partitions) {
            try {
                for (PartitionMarkDoneAction action : actions) {
                    action.markDone(partition);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void snapshotState() throws Exception {
        this.trigger.snapshotState();
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeAllQuietly(this.actions);
    }
}

