/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.NonNull;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.enumerator.TiDBSourceCheckpointState;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiSession;
import org.tikv.kvproto.Coprocessor;

public class TiDBSourceSplitEnumerator
implements SourceSplitEnumerator<TiDBSourceSplit, TiDBSourceCheckpointState> {
    private static final Logger log = LoggerFactory.getLogger(TiDBSourceSplitEnumerator.class);
    private final TiDBSourceConfig sourceConfig;
    private final Map<Integer, TiDBSourceSplit> assignedSplit;
    private final Map<Integer, TiDBSourceSplit> pendingSplit;
    private final SourceSplitEnumerator.Context<TiDBSourceSplit> context;
    private TiSession tiSession;
    private long tableId;
    private volatile boolean shouldEnumerate;
    private final Object stateLock = new Object();

    public TiDBSourceSplitEnumerator(@NonNull SourceSplitEnumerator.Context<TiDBSourceSplit> context, @NonNull TiDBSourceConfig sourceConfig) {
        this(context, sourceConfig, null);
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
    }

    public TiDBSourceSplitEnumerator(@NonNull SourceSplitEnumerator.Context<TiDBSourceSplit> context, @NonNull TiDBSourceConfig sourceConfig, TiDBSourceCheckpointState restoreState) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.assignedSplit = new HashMap<Integer, TiDBSourceSplit>();
        this.pendingSplit = new HashMap<Integer, TiDBSourceSplit>();
        boolean bl = this.shouldEnumerate = restoreState == null;
        if (restoreState != null) {
            this.shouldEnumerate = restoreState.isShouldEnumerate();
            this.pendingSplit.putAll(restoreState.getPendingSplit());
        }
    }

    public void open() {
        this.tiSession = TiSession.create(this.sourceConfig.getTiConfiguration());
        this.tableId = this.tiSession.getCatalog().getTable(this.sourceConfig.getDatabaseName(), this.sourceConfig.getTableName()).getId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        Set readers = this.context.registeredReaders();
        if (this.shouldEnumerate) {
            List<TiDBSourceSplit> sourceSplits = this.getTiDBSourceSplit();
            Object object = this.stateLock;
            synchronized (object) {
                this.addPendingSplit(sourceSplits);
                this.fetchAssignedSplit();
                this.shouldEnumerate = false;
                this.assignSplit(readers);
            }
        }
        log.debug("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", (Object)readers);
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    private void fetchAssignedSplit() {
        for (Map.Entry<Integer, TiDBSourceSplit> split : this.pendingSplit.entrySet()) {
            if (!this.assignedSplit.containsKey(split.getKey())) continue;
            this.pendingSplit.put(split.getKey(), split.getValue());
        }
    }

    private synchronized void addPendingSplit(List<TiDBSourceSplit> splits) {
        splits.forEach(split -> this.pendingSplit.put(TiDBSourceSplitEnumerator.getSplitOwner(split.splitId(), this.context.currentParallelism()), (TiDBSourceSplit)split));
    }

    private void assignSplit(Collection<Integer> readers) {
        for (Integer reader : readers) {
            TiDBSourceSplit assignmentForReader = this.pendingSplit.remove(reader);
            if (assignmentForReader == null) continue;
            log.debug("Assign splits {} to reader {}", (Object)assignmentForReader, (Object)reader);
            this.context.assignSplit(reader.intValue(), (SourceSplit)assignmentForReader);
        }
    }

    private static int getSplitOwner(String splitId, int numReaders) {
        return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    private List<TiDBSourceSplit> getTiDBSourceSplit() {
        ArrayList sourceSplits = Lists.newArrayList();
        List<Coprocessor.KeyRange> keyRanges = TableKeyRangeUtils.getTableKeyRanges(this.tableId, this.context.currentParallelism());
        for (Coprocessor.KeyRange keyRange : keyRanges) {
            sourceSplits.add(new TiDBSourceSplit(this.sourceConfig.getDatabaseName(), this.sourceConfig.getTableName(), keyRange, this.sourceConfig.getStartupMode() == StartupMode.INITIAL ? -1L : 0L, keyRange.getStart(), false));
        }
        return sourceSplits;
    }

    public void close() throws IOException {
        if (this.tiSession != null) {
            try {
                this.tiSession.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void addSplitsBack(List<TiDBSourceSplit> splits, int subtaskId) {
        log.debug("Add back splits {} to TiDBSourceSplitEnumerator.", splits);
        if (!splits.isEmpty()) {
            this.addPendingSplit(splits);
            if (this.context.registeredReaders().contains(subtaskId)) {
                this.assignSplit(Collections.singletonList(subtaskId));
            } else {
                log.warn("Reader {} is not registered. Pending splits {} are not assigned.", (Object)subtaskId, splits);
            }
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
        log.debug("Register reader {} to TiDBSourceSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    public TiDBSourceCheckpointState snapshotState(long checkpointId) throws Exception {
        return new TiDBSourceCheckpointState(this.shouldEnumerate, this.pendingSplit);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

