/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.source.internal.enumerator.processor;

import io.delta.flink.internal.options.DeltaConnectorConfiguration;
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.enumerator.monitor.ChangesPerVersion;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitor;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitorResult;
import io.delta.flink.source.internal.enumerator.processor.ContinuousTableProcessor;
import io.delta.flink.source.internal.enumerator.processor.TableProcessorBase;
import io.delta.flink.source.internal.exceptions.DeltaSourceException;
import io.delta.flink.source.internal.exceptions.DeltaSourceExceptions;
import io.delta.flink.source.internal.file.AddFileEnumerator;
import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpointBuilder;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.utils.SourceUtils;
import io.delta.standalone.actions.AddFile;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChangesProcessor
extends TableProcessorBase
implements ContinuousTableProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ChangesProcessor.class);
    private final TableMonitor tableMonitor;
    private final SplitEnumeratorContext<DeltaSourceSplit> enumContext;
    private final long checkInterval;
    private final long initialDelay;
    private long currentSnapshotVersion;

    public ChangesProcessor(Path path, TableMonitor tableMonitor, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, AddFileEnumerator<DeltaSourceSplit> addFileEnumerator, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        super(path, addFileEnumerator);
        this.tableMonitor = tableMonitor;
        this.enumContext = splitEnumeratorContext;
        this.currentSnapshotVersion = this.tableMonitor.getMonitorVersion();
        this.checkInterval = deltaConnectorConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INTERVAL);
        this.initialDelay = deltaConnectorConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INITIAL_DELAY);
    }

    @Override
    public void process(Consumer<List<DeltaSourceSplit>> consumer) {
        this.enumContext.callAsync((Callable)this.tableMonitor, (tableMonitorResult, throwable) -> this.processDiscoveredVersions((TableMonitorResult)tableMonitorResult, consumer, (Throwable)throwable), this.initialDelay, this.checkInterval);
    }

    @Override
    public long getSnapshotVersion() {
        return this.currentSnapshotVersion;
    }

    @Override
    public DeltaEnumeratorStateCheckpointBuilder<DeltaSourceSplit> snapshotState(DeltaEnumeratorStateCheckpointBuilder<DeltaSourceSplit> deltaEnumeratorStateCheckpointBuilder) {
        return deltaEnumeratorStateCheckpointBuilder.withMonitoringForChanges(this.isMonitoringForChanges());
    }

    @Override
    public boolean isMonitoringForChanges() {
        return true;
    }

    private void processDiscoveredVersions(TableMonitorResult tableMonitorResult, Consumer<List<DeltaSourceSplit>> consumer, Throwable throwable) {
        if (throwable != null) {
            LOG.error("Failed to enumerate files", throwable);
            if (throwable instanceof DeltaSourceException) {
                throw (DeltaSourceException)throwable;
            }
            throw DeltaSourceExceptions.tableMonitorException(SourceUtils.pathToString(this.deltaTablePath), throwable);
        }
        tableMonitorResult.getChanges().forEach(changesPerVersion -> this.processVersion(consumer, (ChangesPerVersion<AddFile>)changesPerVersion));
    }

    private void processVersion(Consumer<List<DeltaSourceSplit>> consumer, ChangesPerVersion<AddFile> changesPerVersion) {
        this.currentSnapshotVersion = changesPerVersion.getSnapshotVersion() + 1L;
        List<DeltaSourceSplit> list = this.prepareSplits(changesPerVersion, path -> true);
        consumer.accept(list);
    }
}

