/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.ingestion;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.ingestion.HoodieIngestionException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieIngestionService
extends HoodieAsyncService {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieIngestionService.class);
    protected HoodieIngestionConfig ingestionConfig;

    public HoodieIngestionService(HoodieIngestionConfig ingestionConfig) {
        this.ingestionConfig = ingestionConfig;
    }

    public void startIngestion() {
        if (this.ingestionConfig.getBoolean(HoodieIngestionConfig.INGESTION_IS_CONTINUOUS).booleanValue()) {
            LOG.info("Ingestion service starts running in continuous mode");
            this.start(this::onIngestionCompletes);
            try {
                this.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieIngestionException("Ingestion service was shut down with exception.", e);
            }
            LOG.info("Ingestion service (continuous mode) has been shut down.");
        } else {
            LOG.info("Ingestion service starts running in run-once mode");
            this.ingestOnce();
            LOG.info("Ingestion service (run-once mode) has been shut down.");
        }
    }

    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        return Pair.of(CompletableFuture.supplyAsync(() -> {
            try {
                while (!this.isShutdownRequested()) {
                    long ingestionStartEpochMillis = System.currentTimeMillis();
                    this.ingestOnce();
                    boolean requested = this.requestShutdownIfNeeded((Option<HoodieData<WriteStatus>>)Option.empty());
                    if (requested) continue;
                    this.sleepBeforeNextIngestion(ingestionStartEpochMillis);
                }
            }
            finally {
                executor.shutdownNow();
            }
            return true;
        }, executor), (Object)executor);
    }

    public abstract void ingestOnce();

    protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> lastWriteStatus) {
        return false;
    }

    protected void sleepBeforeNextIngestion(long ingestionStartEpochMillis) {
        try {
            long minSyncInternalSeconds = this.ingestionConfig.getLongOrDefault(HoodieIngestionConfig.INGESTION_MIN_SYNC_INTERNAL_SECONDS);
            long sleepMs = minSyncInternalSeconds * 1000L - (System.currentTimeMillis() - ingestionStartEpochMillis);
            if (sleepMs > 0L) {
                LOG.info(String.format("Last ingestion took less than min sync interval: %d s; sleep for %.2f s", minSyncInternalSeconds, (double)sleepMs / 1000.0));
                Thread.sleep(sleepMs);
            }
        }
        catch (InterruptedException e) {
            throw new HoodieIngestionException("Ingestion service (continuous mode) was interrupted during sleep.", e);
        }
    }

    protected boolean onIngestionCompletes(boolean hasError) {
        return true;
    }

    public abstract Option<HoodieIngestionMetrics> getMetrics();

    public void close() {
        if (!this.isShutdown()) {
            this.shutdown(true);
        }
    }

    public static class HoodieIngestionConfig
    extends HoodieConfig {
        public static final ConfigProperty<Boolean> INGESTION_IS_CONTINUOUS = ConfigProperty.key((String)"hoodie.utilities.ingestion.is.continuous").defaultValue((Object)false).markAdvanced().withDocumentation("Indicate if the ingestion runs in a continuous loop.");
        public static final ConfigProperty<Integer> INGESTION_MIN_SYNC_INTERNAL_SECONDS = ConfigProperty.key((String)"hoodie.utilities.ingestion.min.sync.internal.seconds").defaultValue((Object)0).markAdvanced().withDocumentation("the minimum sync interval of each ingestion in continuous mode");

        public static Builder newBuilder() {
            return new Builder();
        }

        public static class Builder {
            private final HoodieIngestionConfig ingestionConfig = new HoodieIngestionConfig();

            public Builder isContinuous(boolean isContinuous) {
                this.ingestionConfig.setValue(INGESTION_IS_CONTINUOUS, String.valueOf(isContinuous));
                return this;
            }

            public Builder withMinSyncInternalSeconds(int minSyncInternalSeconds) {
                this.ingestionConfig.setValue(INGESTION_MIN_SYNC_INTERNAL_SECONDS, String.valueOf(minSyncInternalSeconds));
                return this;
            }

            public HoodieIngestionConfig build() {
                this.ingestionConfig.setDefaults(HoodieIngestionConfig.class.getName());
                return this.ingestionConfig;
            }
        }
    }
}

