/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.AbstractStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

public class IncrementalStartingScanner
extends AbstractStartingScanner {
    private long endingSnapshotId;
    private ScanMode scanMode;

    public IncrementalStartingScanner(SnapshotManager snapshotManager, long start, long end, ScanMode scanMode) {
        super(snapshotManager);
        this.startingSnapshotId = start;
        this.endingSnapshotId = end;
        this.scanMode = scanMode;
    }

    @Override
    public StartingScanner.Result scan(SnapshotReader reader) {
        HashMap<Pair, List> grouped = new HashMap<Pair, List>();
        for (long i = this.startingSnapshotId + 1L; i < this.endingSnapshotId + 1L; ++i) {
            List<DataSplit> splits = this.readSplits(reader, this.snapshotManager.snapshot(i));
            for (DataSplit split : splits) {
                grouped.computeIfAbsent(Pair.of(split.partition(), split.bucket()), k -> new ArrayList()).addAll(split.dataFiles());
            }
        }
        final ArrayList<DataSplit> result = new ArrayList<DataSplit>();
        for (Map.Entry entry : grouped.entrySet()) {
            BinaryRow partition = (BinaryRow)((Pair)entry.getKey()).getLeft();
            int bucket = (Integer)((Pair)entry.getKey()).getRight();
            for (List<DataFileMeta> files : reader.splitGenerator().splitForBatch((List)entry.getValue())) {
                result.add(DataSplit.builder().withSnapshot(this.endingSnapshotId).withPartition(partition).withBucket(bucket).withDataFiles(files).build());
            }
        }
        return StartingScanner.fromPlan(new SnapshotReader.Plan(){

            @Override
            public Long watermark() {
                return null;
            }

            @Override
            public Long snapshotId() {
                return IncrementalStartingScanner.this.endingSnapshotId;
            }

            @Override
            public List<Split> splits() {
                return result;
            }
        });
    }

    private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
        switch (this.scanMode) {
            case CHANGELOG: {
                return this.readChangeLogSplits(reader, s);
            }
            case DELTA: {
                return this.readDeltaSplits(reader, s);
            }
        }
        throw new UnsupportedOperationException("Unsupported scan kind: " + (Object)((Object)this.scanMode));
    }

    private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
        if (s.commitKind() != Snapshot.CommitKind.APPEND) {
            return Collections.emptyList();
        }
        return reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
    }

    private List<DataSplit> readChangeLogSplits(SnapshotReader reader, Snapshot s) {
        if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
            return Collections.emptyList();
        }
        return reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
    }
}

