/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.AbstractSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanContext;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanSplitPlanner;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergStreamSplitEnumerator
extends AbstractSplitEnumerator {
    private static final Logger log = LoggerFactory.getLogger(IcebergStreamSplitEnumerator.class);
    private final ConcurrentMap<TablePath, IcebergEnumeratorPosition> tableOffsets = new ConcurrentHashMap<TablePath, IcebergEnumeratorPosition>();
    private volatile boolean initialized = false;

    public IcebergStreamSplitEnumerator(SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, IcebergSourceConfig sourceConfig, Map<TablePath, CatalogTable> catalogTables, Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections) {
        this(context, sourceConfig, catalogTables, tableSchemaProjections, null);
    }

    public IcebergStreamSplitEnumerator(SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, IcebergSourceConfig sourceConfig, Map<TablePath, CatalogTable> catalogTables, Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections, IcebergSplitEnumeratorState state) {
        super(context, sourceConfig, catalogTables, tableSchemaProjections, state);
        if (state != null) {
            if (state.getLastEnumeratedPosition() != null) {
                state.setPendingTable(catalogTables.values().stream().findFirst().get().getTablePath());
            }
            this.tableOffsets.putAll(state.getTableOffsets());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        Set readers = this.context.registeredReaders();
        while (true) {
            for (TablePath tablePath : this.pendingTables) {
                this.checkThrowInterruptedException();
                Object object = this.stateLock;
                synchronized (object) {
                    log.info("Scan table {}.", (Object)tablePath);
                    List<IcebergFileScanTaskSplit> splits = this.loadSplits(tablePath);
                    log.info("Scan table {} into {} splits.", (Object)tablePath, (Object)splits.size());
                    this.addPendingSplits(splits);
                    this.assignPendingSplits(readers);
                }
            }
            if (Boolean.FALSE.equals(this.initialized)) {
                this.initialized = true;
            }
            this.stateLock.wait(this.sourceConfig.getIncrementScanInterval());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
        Object object = this.stateLock;
        synchronized (object) {
            return new IcebergSplitEnumeratorState(new ArrayList<TablePath>(this.pendingTables), new HashMap<Integer, List<IcebergFileScanTaskSplit>>(this.pendingSplits), new HashMap<TablePath, IcebergEnumeratorPosition>(this.tableOffsets));
        }
    }

    @Override
    public void handleSplitRequest(int subtaskId) {
        if (this.initialized) {
            this.stateLock.notifyAll();
        }
    }

    private List<IcebergFileScanTaskSplit> loadSplits(TablePath tablePath) {
        Table table = this.loadTable(tablePath);
        IcebergEnumeratorPosition offset = (IcebergEnumeratorPosition)this.tableOffsets.get(tablePath);
        Pair tableSchemaProjection = (Pair)this.tableSchemaProjections.get(tablePath);
        IcebergScanContext scanContext = IcebergScanContext.streamScanContext(this.sourceConfig, this.sourceConfig.getTableConfig(tablePath), (Schema)tableSchemaProjection.getRight());
        IcebergEnumerationResult result = IcebergScanSplitPlanner.planStreamSplits(table, scanContext, offset);
        if (!Objects.equals(result.getFromPosition(), offset)) {
            log.info("Skip {} loaded splits because the scan starting position doesn't match the current enumerator position: enumerator position = {}, scan starting position = {}", new Object[]{result.getSplits().size(), this.tableOffsets.get(tablePath), result.getFromPosition()});
            return Collections.emptyList();
        }
        this.tableOffsets.put(tablePath, result.getToPosition());
        log.debug("Update enumerator position to {}", (Object)result.getToPosition());
        return result.getSplits();
    }
}

