/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Objects;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;

public class IncrSourceHelper {
    private static String getStrictlyLowerTimestamp(String timestamp) {
        long ts = Long.parseLong(timestamp);
        ValidationUtils.checkArgument(ts > 0L, "Timestamp must be positive");
        long lower = ts - 1L;
        return "" + lower;
    }

    public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) {
        ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
        HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true);
        HoodieTimeline activeCommitTimeline = srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
        String beginInstantTime = beginInstant.orElseGet(() -> {
            if (readLatestOnMissingBeginInstant) {
                Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
                return lastInstant.map(hoodieInstant -> IncrSourceHelper.getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
            }
            throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest committed instant set hoodie.deltastreamer.source.hoodie.read_latest_on_midding_ckpt to true");
        });
        Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
        return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
    }

    public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
        Objects.requireNonNull(instantTime);
        ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN, sinceInstant), "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)");
        ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant), "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)");
    }
}

