/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.NonNull;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader.IcebergFileScanTaskReader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader.IcebergFileScanTaskSplitReader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergSourceReader
implements SourceReader<SeaTunnelRow, IcebergFileScanTaskSplit> {
    private static final Logger log = LoggerFactory.getLogger(IcebergSourceReader.class);
    private static final long POLL_WAIT_MS = 1000L;
    private final SourceReader.Context context;
    private final IcebergSourceConfig sourceConfig;
    private final Map<TablePath, CatalogTable> tables;
    private final Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections;
    private final BlockingQueue<IcebergFileScanTaskSplit> pendingSplits;
    private volatile IcebergFileScanTaskSplit currentReadSplit;
    private volatile boolean noMoreSplitsAssignment;
    private Catalog catalog;
    private ConcurrentMap<TablePath, IcebergFileScanTaskSplitReader> tableReaders;

    public IcebergSourceReader(@NonNull SourceReader.Context context, @NonNull IcebergSourceConfig sourceConfig, @NonNull Map<TablePath, CatalogTable> tables, @NonNull Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        if (tables == null) {
            throw new NullPointerException("tables is marked non-null but is null");
        }
        if (tableSchemaProjections == null) {
            throw new NullPointerException("tableSchemaProjections is marked non-null but is null");
        }
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.tables = tables;
        this.tableSchemaProjections = tableSchemaProjections;
        this.pendingSplits = new LinkedBlockingQueue<IcebergFileScanTaskSplit>();
        this.tableReaders = new ConcurrentHashMap<TablePath, IcebergFileScanTaskSplitReader>();
    }

    public void open() {
        IcebergCatalogLoader catalogFactory = new IcebergCatalogLoader(this.sourceConfig);
        this.catalog = catalogFactory.loadCatalog();
    }

    public void close() throws IOException {
        if (this.catalog != null && this.catalog instanceof Closeable) {
            ((Closeable)((Object)this.catalog)).close();
        }
        this.tableReaders.forEach((tablePath, reader) -> reader.close());
    }

    private IcebergFileScanTaskSplitReader getOrCreateTableReader(TablePath tablePath) {
        IcebergFileScanTaskSplitReader tableReader = (IcebergFileScanTaskSplitReader)this.tableReaders.get(tablePath);
        if (tableReader != null) {
            return tableReader;
        }
        if (Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.tableReaders.forEach((key, value) -> value.close());
            this.tableReaders.clear();
        }
        return this.tableReaders.computeIfAbsent(tablePath, key -> {
            SourceTableConfig tableConfig = this.sourceConfig.getTableConfig((TablePath)key);
            CatalogTable catalogTable = this.tables.get(key);
            Pair<Schema, Schema> pair = this.tableSchemaProjections.get(key);
            Schema tableSchema = pair.getLeft();
            Schema projectedSchema = pair.getRight();
            DefaultDeserializer deserializer = new DefaultDeserializer(catalogTable.getSeaTunnelRowType(), projectedSchema);
            Table icebergTable = this.catalog.loadTable(tableConfig.getTableIdentifier());
            return new IcebergFileScanTaskSplitReader(deserializer, IcebergFileScanTaskReader.builder().fileIO(icebergTable.io()).tableSchema(tableSchema).projectedSchema(projectedSchema).caseSensitive(this.sourceConfig.isCaseSensitive()).reuseContainers(true).build());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            this.currentReadSplit = (IcebergFileScanTaskSplit)this.pendingSplits.poll();
            if (this.currentReadSplit != null) {
                IcebergFileScanTaskSplitReader tableReader = this.getOrCreateTableReader(this.currentReadSplit.getTablePath());
                try (CloseableIterator<SeaTunnelRow> rowIterator = tableReader.open(this.currentReadSplit);){
                    while (rowIterator.hasNext()) {
                        output.collect((Object)((SeaTunnelRow)rowIterator.next()));
                    }
                }
                return;
            }
        }
        if (this.noMoreSplitsAssignment && Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        } else {
            this.context.sendSplitRequest();
            if (this.pendingSplits.isEmpty()) {
                Thread.sleep(1000L);
            }
        }
    }

    public List<IcebergFileScanTaskSplit> snapshotState(long checkpointId) {
        ArrayList<IcebergFileScanTaskSplit> readerState = new ArrayList<IcebergFileScanTaskSplit>();
        if (!this.pendingSplits.isEmpty()) {
            readerState.addAll(this.pendingSplits);
        }
        if (this.currentReadSplit != null) {
            readerState.add(this.currentReadSplit);
        }
        return readerState;
    }

    public void addSplits(List<IcebergFileScanTaskSplit> splits) {
        log.info("Add {} splits to reader", (Object)splits.size());
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

