/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.partition;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

public class PartitionMarkDone
implements Closeable {
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionMarkDoneTrigger trigger;
    private final List<PartitionMarkDoneAction> actions;
    private final boolean waitCompaction;

    @Nullable
    public static PartitionMarkDone create(boolean isStreaming, boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception {
        CoreOptions coreOptions = table.coreOptions();
        Options options = coreOptions.toConfiguration();
        if (PartitionMarkDone.disablePartitionMarkDone(isStreaming, table, options)) {
            return null;
        }
        InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer(coreOptions.partitionDefaultName(), table.schema().logicalPartitionType(), table.partitionKeys().toArray(new String[0]));
        PartitionMarkDoneTrigger trigger = PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);
        List<PartitionMarkDoneAction> actions = PartitionMarkDoneAction.createActions(table, coreOptions);
        boolean waitCompaction = !table.primaryKeys().isEmpty() && (coreOptions.deletionVectorsEnabled() || coreOptions.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW);
        return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction);
    }

    private static boolean disablePartitionMarkDone(boolean isStreaming, FileStoreTable table, Options options) {
        boolean partitionMarkDoneWhenEndInput = options.get(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT);
        if (!isStreaming && !partitionMarkDoneWhenEndInput) {
            return true;
        }
        Duration idleToDone = options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE);
        if (isStreaming && idleToDone == null) {
            return true;
        }
        return table.partitionKeys().isEmpty();
    }

    public PartitionMarkDone(InternalRowPartitionComputer partitionComputer, PartitionMarkDoneTrigger trigger, List<PartitionMarkDoneAction> actions, boolean waitCompaction) {
        this.partitionComputer = partitionComputer;
        this.trigger = trigger;
        this.actions = actions;
        this.waitCompaction = waitCompaction;
    }

    public void notifyCommittable(List<ManifestCommittable> committables) {
        HashSet<BinaryRow> partitions = new HashSet<BinaryRow>();
        boolean endInput = false;
        for (ManifestCommittable committable : committables) {
            for (CommitMessage commitMessage : committable.fileCommittables()) {
                CommitMessageImpl message = (CommitMessageImpl)commitMessage;
                if (!this.waitCompaction && message.indexIncrement().isEmpty() && message.newFilesIncrement().isEmpty()) continue;
                partitions.add(message.partition());
            }
            if (committable.identifier() != Long.MAX_VALUE) continue;
            endInput = true;
        }
        partitions.stream().map(this.partitionComputer::generatePartValues).map(PartitionPathUtils::generatePartitionPath).forEach(this.trigger::notifyPartition);
        PartitionMarkDone.markDone(this.trigger.donePartitions(endInput), this.actions);
    }

    public static void markDone(List<String> partitions, List<PartitionMarkDoneAction> actions) {
        for (String partition : partitions) {
            try {
                for (PartitionMarkDoneAction action : actions) {
                    action.markDone(partition);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void snapshotState() throws Exception {
        this.trigger.snapshotState();
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeAllQuietly(this.actions);
    }
}

