/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.source.internal.enumerator;

import io.delta.flink.internal.options.DeltaConnectorConfiguration;
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.enumerator.ContinuousDeltaSourceSplitEnumerator;
import io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitor;
import io.delta.flink.source.internal.enumerator.processor.ActionProcessor;
import io.delta.flink.source.internal.enumerator.processor.ChangesProcessor;
import io.delta.flink.source.internal.enumerator.processor.ContinuousTableProcessor;
import io.delta.flink.source.internal.enumerator.processor.SnapshotAndChangesTableProcessor;
import io.delta.flink.source.internal.enumerator.processor.SnapshotProcessor;
import io.delta.flink.source.internal.file.AddFileEnumerator;
import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpoint;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.utils.SourceUtils;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

public class ContinuousSplitEnumeratorProvider
implements SplitEnumeratorProvider {
    private final FileSplitAssigner.Provider splitAssignerProvider;
    private final AddFileEnumerator.Provider<DeltaSourceSplit> fileEnumeratorProvider;

    public ContinuousSplitEnumeratorProvider(FileSplitAssigner.Provider provider, AddFileEnumerator.Provider<DeltaSourceSplit> provider2) {
        this.splitAssignerProvider = provider;
        this.fileEnumeratorProvider = provider2;
    }

    public ContinuousDeltaSourceSplitEnumerator createInitialStateEnumerator(Path path, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        DeltaLog deltaLog = DeltaLog.forTable((Configuration)configuration, (String)SourceUtils.pathToString(path));
        Snapshot snapshot = deltaLog.getSnapshotForVersionAsOf(deltaConnectorConfiguration.getValue(DeltaSourceOptions.LOADED_SCHEMA_SNAPSHOT_VERSION).longValue());
        ContinuousTableProcessor continuousTableProcessor = this.createTableProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, snapshot);
        return new ContinuousDeltaSourceSplitEnumerator(path, continuousTableProcessor, this.splitAssignerProvider.create(Collections.emptyList()), splitEnumeratorContext);
    }

    public ContinuousDeltaSourceSplitEnumerator createEnumeratorForCheckpoint(DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deltaEnumeratorStateCheckpoint, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        ContinuousTableProcessor continuousTableProcessor = this.createTableProcessorFromCheckpoint(deltaEnumeratorStateCheckpoint, configuration, splitEnumeratorContext, deltaConnectorConfiguration);
        Collection<DeltaSourceSplit> collection = deltaEnumeratorStateCheckpoint.getSplits();
        return new ContinuousDeltaSourceSplitEnumerator(deltaEnumeratorStateCheckpoint.getDeltaTablePath(), continuousTableProcessor, this.splitAssignerProvider.create(collection), splitEnumeratorContext);
    }

    private ContinuousTableProcessor createTableProcessorFromCheckpoint(DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deltaEnumeratorStateCheckpoint, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration) {
        long l = deltaEnumeratorStateCheckpoint.getSnapshotVersion();
        Path path = deltaEnumeratorStateCheckpoint.getDeltaTablePath();
        DeltaLog deltaLog = DeltaLog.forTable((Configuration)configuration, (String)SourceUtils.pathToString(path));
        if (deltaEnumeratorStateCheckpoint.isMonitoringForChanges()) {
            return this.createChangesProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, l);
        }
        return this.createSnapshotAndChangesProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, deltaLog.getSnapshotForVersionAsOf(l));
    }

    private ContinuousTableProcessor createTableProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration, DeltaLog deltaLog, Snapshot snapshot) {
        if (this.isChangeStreamOnly(deltaConnectorConfiguration)) {
            return this.createChangesProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, snapshot.getVersion());
        }
        return this.createSnapshotAndChangesProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, snapshot);
    }

    private ChangesProcessor createChangesProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration, DeltaLog deltaLog, long l) {
        ActionProcessor actionProcessor = new ActionProcessor(deltaConnectorConfiguration.getValue(DeltaSourceOptions.IGNORE_CHANGES), deltaConnectorConfiguration.getValue(DeltaSourceOptions.IGNORE_DELETES));
        TableMonitor tableMonitor = new TableMonitor(deltaLog, l, deltaConnectorConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INTERVAL), actionProcessor);
        return new ChangesProcessor(path, tableMonitor, splitEnumeratorContext, this.fileEnumeratorProvider.create(), deltaConnectorConfiguration);
    }

    private ContinuousTableProcessor createSnapshotAndChangesProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaConnectorConfiguration deltaConnectorConfiguration, DeltaLog deltaLog, Snapshot snapshot) {
        ChangesProcessor changesProcessor = this.createChangesProcessor(path, splitEnumeratorContext, deltaConnectorConfiguration, deltaLog, snapshot.getVersion() + 1L);
        SnapshotProcessor snapshotProcessor = new SnapshotProcessor(path, snapshot, this.fileEnumeratorProvider.create(), Collections.emptySet());
        return new SnapshotAndChangesTableProcessor(snapshotProcessor, changesProcessor);
    }

    @Override
    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    private boolean isChangeStreamOnly(DeltaConnectorConfiguration deltaConnectorConfiguration) {
        return deltaConnectorConfiguration.hasOption(DeltaSourceOptions.STARTING_VERSION) || deltaConnectorConfiguration.hasOption(DeltaSourceOptions.STARTING_TIMESTAMP);
    }
}

