/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider;
import org.apache.paimon.table.source.splitread.IncrementalDiffReadProvider;
import org.apache.paimon.table.source.splitread.MergeFileSplitReadProvider;
import org.apache.paimon.table.source.splitread.PrimaryKeyTableRawFileSplitReadProvider;
import org.apache.paimon.table.source.splitread.SplitReadProvider;
import org.apache.paimon.types.RowType;

public final class KeyValueTableRead
extends AbstractDataTableRead {
    private final List<SplitReadProvider> readProviders;
    @Nullable
    private RowType readType = null;
    private boolean forceKeepDelete = false;
    private Predicate predicate = null;
    private IOManager ioManager = null;
    @Nullable
    private TopN topN = null;
    @Nullable
    private Integer limit = null;

    public KeyValueTableRead(Supplier<MergeFileSplitRead> mergeReadSupplier, Supplier<RawFileSplitRead> batchRawReadSupplier, TableSchema schema) {
        super(schema);
        this.readProviders = Arrays.asList(new PrimaryKeyTableRawFileSplitReadProvider(batchRawReadSupplier, this::config), new MergeFileSplitReadProvider(mergeReadSupplier, this::config), new IncrementalChangelogReadProvider(mergeReadSupplier, this::config), new IncrementalDiffReadProvider(mergeReadSupplier, this::config));
    }

    private List<SplitRead<InternalRow>> initialized() {
        ArrayList<SplitRead<InternalRow>> readers = new ArrayList<SplitRead<InternalRow>>();
        for (SplitReadProvider readProvider : this.readProviders) {
            if (!readProvider.get().initialized()) continue;
            readers.add(readProvider.get().get());
        }
        return readers;
    }

    private void config(SplitRead<InternalRow> read) {
        if (this.forceKeepDelete) {
            read = read.forceKeepDelete();
        }
        if (this.readType != null) {
            read = read.withReadType(this.readType);
        }
        if (this.topN != null) {
            read = read.withTopN(this.topN);
        }
        if (this.limit != null) {
            read = read.withLimit(this.limit);
        }
        read.withFilter(this.predicate).withIOManager(this.ioManager);
    }

    @Override
    public void applyReadType(RowType readType) {
        this.initialized().forEach(r -> r.withReadType(readType));
        this.readType = readType;
    }

    @Override
    public InnerTableRead forceKeepDelete() {
        this.initialized().forEach(SplitRead::forceKeepDelete);
        this.forceKeepDelete = true;
        return this;
    }

    @Override
    protected InnerTableRead innerWithFilter(Predicate predicate) {
        this.initialized().forEach(r -> r.withFilter(predicate));
        this.predicate = predicate;
        return this;
    }

    @Override
    public InnerTableRead withTopN(TopN topN) {
        this.initialized().forEach(r -> r.withTopN(topN));
        this.topN = topN;
        return this;
    }

    @Override
    public InnerTableRead withLimit(int limit) {
        this.initialized().forEach(r -> r.withLimit(limit));
        this.limit = limit;
        return this;
    }

    @Override
    public TableRead withIOManager(IOManager ioManager) {
        this.initialized().forEach(r -> r.withIOManager(ioManager));
        this.ioManager = ioManager;
        return this;
    }

    @Override
    public RecordReader<InternalRow> reader(Split split) throws IOException {
        DataSplit dataSplit = (DataSplit)split;
        for (SplitReadProvider readProvider : this.readProviders) {
            if (!readProvider.match(dataSplit, this.forceKeepDelete)) continue;
            return readProvider.get().get().createReader(dataSplit);
        }
        throw new RuntimeException("Should not happen.");
    }

    public static RecordReader<InternalRow> unwrap(final RecordReader<KeyValue> reader) {
        return new RecordReader<InternalRow>(){

            @Override
            @Nullable
            public RecordReader.RecordIterator<InternalRow> readBatch() throws IOException {
                RecordReader.RecordIterator<KeyValue> batch = reader.readBatch();
                return batch == null ? null : new ValueContentRowDataRecordIterator(batch);
            }

            @Override
            public void close() throws IOException {
                reader.close();
            }
        };
    }

    @VisibleForTesting
    public IOManager ioManager() {
        return this.ioManager;
    }
}

