/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.AbstractInnerTableScan;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;

public class InnerTableScanImpl
extends AbstractInnerTableScan {
    private final SnapshotManager snapshotManager;
    private final DefaultValueAssigner defaultValueAssigner;
    private StartingScanner startingScanner;
    private boolean hasNext;
    private Integer pushDownLimit;

    public InnerTableScanImpl(CoreOptions options, SnapshotReader snapshotReader, SnapshotManager snapshotManager, DefaultValueAssigner defaultValueAssigner) {
        super(options, snapshotReader);
        this.snapshotManager = snapshotManager;
        this.hasNext = true;
        this.defaultValueAssigner = defaultValueAssigner;
    }

    @Override
    public InnerTableScan withFilter(Predicate predicate) {
        this.snapshotReader.withFilter(this.defaultValueAssigner.handlePredicate(predicate));
        return this;
    }

    @Override
    public InnerTableScan withPartitionFilter(Map<String, String> partitionSpec) {
        this.snapshotReader.withPartitionFilter(partitionSpec);
        return this;
    }

    @Override
    public InnerTableScan withLimit(int limit) {
        this.pushDownLimit = limit;
        return this;
    }

    @Override
    public TableScan.Plan plan() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(false);
        }
        if (this.hasNext) {
            this.hasNext = false;
            StartingScanner.Result result = this.startingScanner.scan(this.snapshotReader);
            StartingScanner.Result limitedResult = this.applyPushDownLimit(result);
            return DataFilePlan.fromResult(limitedResult);
        }
        throw new EndOfScanException();
    }

    private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
        if (this.pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
            long scannedRowCount = 0L;
            final SnapshotReader.Plan plan = ((StartingScanner.ScannedResult)result).plan();
            List<DataSplit> splits = plan.dataSplits();
            final ArrayList<DataSplit> limitedSplits = new ArrayList<DataSplit>();
            for (DataSplit dataSplit : splits) {
                long splitRowCount = this.getRowCountForSplit(dataSplit);
                limitedSplits.add(dataSplit);
                if ((scannedRowCount += splitRowCount) < (long)this.pushDownLimit.intValue()) continue;
                break;
            }
            SnapshotReader.Plan newPlan = new SnapshotReader.Plan(){

                @Override
                @Nullable
                public Long watermark() {
                    return plan.watermark();
                }

                @Override
                @Nullable
                public Long snapshotId() {
                    return plan.snapshotId();
                }

                @Override
                public List<Split> splits() {
                    return limitedSplits;
                }
            };
            return new StartingScanner.ScannedResult(newPlan);
        }
        return result;
    }

    private long getRowCountForSplit(DataSplit split) {
        if (split.convertToRawFiles().isPresent()) {
            return split.convertToRawFiles().get().stream().map(RawFile::rowCount).reduce(Long::sum).orElse(0L);
        }
        return 0L;
    }
}

