/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import lombok.NonNull;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader.IcebergFileScanTaskReader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader.IcebergFileScanTaskSplitReader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergSourceReader
implements SourceReader<SeaTunnelRow, IcebergFileScanTaskSplit> {
    private static final Logger log = LoggerFactory.getLogger(IcebergSourceReader.class);
    private static final long POLL_WAIT_MS = 1000L;
    private final SourceReader.Context context;
    private final Queue<IcebergFileScanTaskSplit> pendingSplits;
    private final Deserializer deserializer;
    private final Schema tableSchema;
    private final Schema projectedSchema;
    private final SourceConfig sourceConfig;
    private IcebergTableLoader icebergTableLoader;
    private IcebergFileScanTaskSplitReader icebergFileScanTaskSplitReader;
    private IcebergFileScanTaskSplit currentReadSplit;
    private boolean noMoreSplitsAssignment;
    private CatalogTable catalogTable;

    public IcebergSourceReader(@NonNull SourceReader.Context context, @NonNull SeaTunnelRowType seaTunnelRowType, @NonNull Schema tableSchema, @NonNull Schema projectedSchema, @NonNull SourceConfig sourceConfig, CatalogTable catalogTable) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (seaTunnelRowType == null) {
            throw new NullPointerException("seaTunnelRowType is marked non-null but is null");
        }
        if (tableSchema == null) {
            throw new NullPointerException("tableSchema is marked non-null but is null");
        }
        if (projectedSchema == null) {
            throw new NullPointerException("projectedSchema is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        this.context = context;
        this.pendingSplits = new LinkedList<IcebergFileScanTaskSplit>();
        this.catalogTable = catalogTable;
        this.deserializer = new DefaultDeserializer(seaTunnelRowType, projectedSchema);
        this.tableSchema = tableSchema;
        this.projectedSchema = projectedSchema;
        this.sourceConfig = sourceConfig;
    }

    public void open() {
        this.icebergTableLoader = IcebergTableLoader.create(this.sourceConfig, this.catalogTable);
        this.icebergTableLoader.open();
        this.icebergFileScanTaskSplitReader = new IcebergFileScanTaskSplitReader(this.deserializer, IcebergFileScanTaskReader.builder().fileIO(this.icebergTableLoader.loadTable().io()).tableSchema(this.tableSchema).projectedSchema(this.projectedSchema).caseSensitive(this.sourceConfig.isCaseSensitive()).reuseContainers(true).build());
    }

    public void close() throws IOException {
        if (this.icebergFileScanTaskSplitReader != null) {
            this.icebergFileScanTaskSplitReader.close();
        }
        this.icebergTableLoader.close();
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        IcebergFileScanTaskSplit pendingSplit = this.pendingSplits.poll();
        while (pendingSplit != null) {
            this.currentReadSplit = pendingSplit;
            try (CloseableIterator<SeaTunnelRow> rowIterator = this.icebergFileScanTaskSplitReader.open(this.currentReadSplit);){
                while (rowIterator.hasNext()) {
                    output.collect((Object)((SeaTunnelRow)rowIterator.next()));
                }
            }
            pendingSplit = this.pendingSplits.poll();
        }
        if (this.noMoreSplitsAssignment && Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        } else {
            this.context.sendSplitRequest();
            if (this.pendingSplits.isEmpty()) {
                Thread.sleep(1000L);
            }
        }
    }

    public List<IcebergFileScanTaskSplit> snapshotState(long checkpointId) {
        ArrayList<IcebergFileScanTaskSplit> readerState = new ArrayList<IcebergFileScanTaskSplit>();
        if (!this.pendingSplits.isEmpty()) {
            readerState.addAll(this.pendingSplits);
        }
        if (this.currentReadSplit != null) {
            readerState.add(this.currentReadSplit);
        }
        return readerState;
    }

    public void addSplits(List<IcebergFileScanTaskSplit> splits) {
        log.info("Add {} splits to reader", (Object)splits.size());
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

