/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousEnumerationResult;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPosition;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousIcebergEnumerator
extends AbstractIcebergEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
    private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
    private final SplitAssigner assigner;
    private final ScanContext scanContext;
    private final ContinuousSplitPlanner splitPlanner;
    private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;

    public ContinuousIcebergEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext, SplitAssigner assigner, ScanContext scanContext, ContinuousSplitPlanner splitPlanner, @Nullable IcebergEnumeratorState enumState) {
        super(enumeratorContext, assigner);
        this.enumeratorContext = enumeratorContext;
        this.assigner = assigner;
        this.scanContext = scanContext;
        this.splitPlanner = splitPlanner;
        this.enumeratorPosition = new AtomicReference();
        if (enumState != null) {
            this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
        }
    }

    @Override
    public void start() {
        super.start();
        this.enumeratorContext.callAsync(this::discoverSplits, this::processDiscoveredSplits, 0L, this.scanContext.monitorInterval().toMillis());
    }

    @Override
    public void close() throws IOException {
        this.splitPlanner.close();
        super.close();
    }

    @Override
    protected boolean shouldWaitForMoreSplits() {
        return true;
    }

    public IcebergEnumeratorState snapshotState(long checkpointId) {
        return new IcebergEnumeratorState(this.enumeratorPosition.get(), this.assigner.state());
    }

    private ContinuousEnumerationResult discoverSplits() {
        return this.splitPlanner.planSplits(this.enumeratorPosition.get());
    }

    private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
        if (error == null) {
            if (!Objects.equals(result.fromPosition(), this.enumeratorPosition.get())) {
                LOG.info("Skip {} discovered splits because the scan starting position doesn't match the current enumerator position: enumerator position = {}, scan starting position = {}", new Object[]{result.splits().size(), this.enumeratorPosition.get(), result.fromPosition()});
            } else {
                this.assigner.onDiscoveredSplits(result.splits());
                LOG.info("Added {} splits discovered between ({}, {}] to the assigner", new Object[]{result.splits().size(), result.fromPosition(), result.toPosition()});
                this.enumeratorPosition.set(result.toPosition());
                LOG.info("Update enumerator position to {}", (Object)result.toPosition());
            }
        } else {
            LOG.error("Failed to discover new splits", error);
        }
    }
}

