/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamerCheckpointUtils {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerCheckpointUtils.class);

    public static Option<Checkpoint> getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt, HoodieStreamer.Config streamerConfig, TypedProperties props) throws IOException {
        Option<Object> checkpoint = Option.empty();
        if (commitsTimelineOpt.isPresent()) {
            checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString(commitsTimelineOpt, streamerConfig, props);
        }
        LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
        if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
            int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
            checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
        }
        return checkpoint;
    }

    @VisibleForTesting
    static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> commitsTimelineOpt, HoodieStreamer.Config streamerConfig, TypedProperties props) throws IOException {
        Option<HoodieInstant> lastCommit;
        Option<Checkpoint> resumeCheckpoint = Option.empty();
        HoodieTimeline deltaCommitTimeline = commitsTimelineOpt.get().filter(instant -> instant.getAction().equals("deltacommit"));
        if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name()) && !deltaCommitTimeline.empty()) {
            commitsTimelineOpt = Option.of(deltaCommitTimeline);
        }
        if ((lastCommit = commitsTimelineOpt.get().lastInstant()).isPresent()) {
            Option<HoodieCommitMetadata> commitMetadataOption = StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get());
            int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
            if (commitMetadataOption.isPresent()) {
                HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
                Checkpoint checkpointFromCommit = CheckpointUtils.getCheckpoint(commitMetadata);
                LOG.debug("Checkpoint reset from metadata: " + checkpointFromCommit.getCheckpointResetKey());
                if (streamerConfig.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointIgnoreKey()) || !streamerConfig.ignoreCheckpoint.equals(checkpointFromCommit.getCheckpointIgnoreKey()))) {
                    resumeCheckpoint = Option.empty();
                } else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey()) || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) {
                    resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
                } else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
                    resumeCheckpoint = Option.of(checkpointFromCommit);
                } else if (InstantComparison.compareTimestamps("00000000000002", InstantComparison.LESSER_THAN, lastCommit.get().requestedTime())) {
                    throw new HoodieStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + commitsTimelineOpt.get().getInstants());
                }
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    ConfigUtils.removeConfigFromProps(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
                }
            } else if (streamerConfig.checkpoint != null) {
                resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
            }
        }
        return resumeCheckpoint;
    }

    public static Option<Pair<String, HoodieCommitMetadata>> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                TimelineLayout layout = TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());
                HoodieCommitMetadata commitMetadata = layout.getCommitMetadataSerDe().deserialize((HoodieInstant)instant, timeline.getInstantDetails((HoodieInstant)instant).get(), HoodieCommitMetadata.class);
                if (!(StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("streamer.checkpoint.key.v2")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("streamer.checkpoint.reset.key.v2")))) {
                    return Option.of(Pair.of(instant.toString(), commitMetadata));
                }
                return Option.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return StreamerCheckpointUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> (HoodieCommitMetadata)pair.getRight());
    }

    public static Option<String> getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> timelineOpt) {
        return timelineOpt.map(timeline -> {
            try {
                return StreamerCheckpointUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> (String)pair.getLeft());
            }
            catch (IOException e) {
                throw new HoodieIOException("failed to get latest instant with ValidCheckpointInfo", e);
            }
        }).orElse(Option.empty());
    }
}

