/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSplitEnumerator.class);
    protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
    protected final IcebergSourceConfig sourceConfig;
    protected final Map<TablePath, CatalogTable> tables;
    protected final Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections;
    protected final Catalog icebergCatalog;
    protected final Object stateLock = new Object();
    protected final BlockingQueue<TablePath> pendingTables;
    protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;

    public AbstractSplitEnumerator(SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, IcebergSourceConfig sourceConfig, Map<TablePath, CatalogTable> catalogTables, Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections) {
        this(context, sourceConfig, catalogTables, tableSchemaProjections, null);
    }

    public AbstractSplitEnumerator(SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, IcebergSourceConfig sourceConfig, Map<TablePath, CatalogTable> catalogTables, Map<TablePath, Pair<Schema, Schema>> tableSchemaProjections, IcebergSplitEnumeratorState state) {
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.tables = catalogTables;
        this.tableSchemaProjections = tableSchemaProjections;
        this.icebergCatalog = new IcebergCatalogLoader(sourceConfig).loadCatalog();
        this.pendingTables = new ArrayBlockingQueue<TablePath>(catalogTables.size());
        this.pendingSplits = new HashMap<Integer, List<IcebergFileScanTaskSplit>>();
        if (state == null) {
            this.pendingTables.addAll(catalogTables.values().stream().map(CatalogTable::getTablePath).collect(Collectors.toList()));
        } else {
            this.pendingTables.addAll(state.getPendingTables());
            state.getPendingSplits().values().stream().flatMap(splits -> splits.stream()).map(split -> {
                if (split.getTablePath() == null) {
                    new IcebergFileScanTaskSplit(((CatalogTable)catalogTables.values().stream().findFirst().get()).getTablePath(), split.getTask(), split.getRecordOffset());
                }
                return null;
            }).forEach(split -> this.pendingSplits.computeIfAbsent(AbstractSplitEnumerator.getSplitOwner(split.splitId(), context.currentParallelism()), r -> new ArrayList()).add(split));
        }
    }

    public void open() {
        log.info("Open split enumerator.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            Object object = this.stateLock;
            synchronized (object) {
                this.addPendingSplits(splits);
                if (this.context.registeredReaders().contains(subtaskId)) {
                    this.assignPendingSplits(Collections.singleton(subtaskId));
                } else {
                    log.warn("Reader {} is not registered. Pending splits {} are not assigned.", (Object)subtaskId, splits);
                }
            }
        }
        log.info("Add back splits {} to JdbcSourceSplitEnumerator.", (Object)splits.size());
    }

    public int currentUnassignedSplitSize() {
        if (!this.pendingTables.isEmpty()) {
            return this.pendingTables.size();
        }
        if (!this.pendingSplits.isEmpty()) {
            return this.pendingSplits.values().stream().mapToInt(List::size).sum();
        }
        return 0;
    }

    public void handleSplitRequest(int subtaskId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerReader(int subtaskId) {
        log.debug("Adding reader {} to IcebergSourceEnumerator.", (Object)subtaskId);
        Object object = this.stateLock;
        synchronized (object) {
            this.assignPendingSplits(Collections.singleton(subtaskId));
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    public void close() throws IOException {
        log.info("Close split enumerator.");
        if (this.icebergCatalog instanceof AutoCloseable) {
            ((AutoCloseable)((Object)this.icebergCatalog)).close();
        }
    }

    protected Table loadTable(TablePath tablePath) {
        return this.icebergCatalog.loadTable(TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()));
    }

    protected void checkThrowInterruptedException() throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            log.info("Enumerator thread is interrupted.");
            throw new InterruptedException("Enumerator thread is interrupted.");
        }
    }

    private static int getSplitOwner(String splitId, int numReaders) {
        return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    protected void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
        int numReaders = this.context.currentParallelism();
        for (IcebergFileScanTaskSplit newSplit : newSplits) {
            int ownerReader = AbstractSplitEnumerator.getSplitOwner(newSplit.splitId(), numReaders);
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).add(newSplit);
            log.info("Assigning {} to {} reader.", (Object)newSplit, (Object)ownerReader);
        }
    }

    protected void assignPendingSplits(Set<Integer> pendingReaders) {
        for (int pendingReader : pendingReaders) {
            List<IcebergFileScanTaskSplit> pendingAssignmentForReader = this.pendingSplits.remove(pendingReader);
            if (pendingAssignmentForReader == null || pendingAssignmentForReader.isEmpty()) continue;
            log.info("Assign splits {} to reader {}", pendingAssignmentForReader, (Object)pendingReader);
            try {
                this.context.assignSplit(pendingReader, pendingAssignmentForReader);
            }
            catch (Exception e) {
                log.error("Failed to assign splits {} to reader {}", new Object[]{pendingAssignmentForReader, pendingReader, e});
                this.pendingSplits.put(pendingReader, pendingAssignmentForReader);
            }
        }
    }
}

