/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;

import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceState;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxcomputeSourceSplitEnumerator
implements SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState> {
    private static final Logger log = LoggerFactory.getLogger(MaxcomputeSourceSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<MaxcomputeSourceSplit> enumeratorContext;
    private final Map<Integer, Set<MaxcomputeSourceSplit>> pendingSplits;
    private Set<MaxcomputeSourceSplit> assignedSplits;
    private Config pluginConfig;

    public MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit> enumeratorContext, Config pluginConfig) {
        this.enumeratorContext = enumeratorContext;
        this.pluginConfig = pluginConfig;
        this.pendingSplits = new HashMap<Integer, Set<MaxcomputeSourceSplit>>();
        this.assignedSplits = new HashSet<MaxcomputeSourceSplit>();
    }

    public MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit> enumeratorContext, Config pluginConfig, MaxcomputeSourceState sourceState) {
        this(enumeratorContext, pluginConfig);
        this.assignedSplits = sourceState.getAssignedSplit();
    }

    public void open() {
    }

    public void run() throws Exception {
        this.discoverySplits();
        this.assignPendingSplits();
    }

    public void close() throws IOException {
    }

    public void addSplitsBack(List<MaxcomputeSourceSplit> splits, int subtaskId) {
        this.addSplitChangeToPendingAssignments(splits);
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void registerReader(int subtaskId) {
    }

    public MaxcomputeSourceState snapshotState(long checkpointId) {
        return new MaxcomputeSourceState(this.assignedSplits);
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void handleSplitRequest(int subtaskId) {
    }

    private void discoverySplits() throws TunnelException {
        TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig);
        long recordCount = session.getRecordCount();
        int numReaders = this.enumeratorContext.currentParallelism();
        int splitRowNum = (int)Math.ceil((double)recordCount / (double)numReaders);
        int splitRow = (Integer)MaxcomputeConfig.SPLIT_ROW.defaultValue();
        if (this.pluginConfig.hasPath(MaxcomputeConfig.SPLIT_ROW.key())) {
            splitRow = this.pluginConfig.getInt(MaxcomputeConfig.SPLIT_ROW.key());
        }
        HashSet<MaxcomputeSourceSplit> allSplit = new HashSet<MaxcomputeSourceSplit>();
        for (int i = 0; i < numReaders; ++i) {
            int readerStart = i * splitRowNum;
            int readerEnd = (int)Math.min((long)((i + 1) * splitRowNum), recordCount);
            for (int num = readerStart; num < readerEnd; num += splitRow) {
                allSplit.add(new MaxcomputeSourceSplit(num, Math.min(splitRow, readerEnd - num)));
            }
        }
        this.assignedSplits.forEach(allSplit::remove);
        this.addSplitChangeToPendingAssignments(allSplit);
        log.debug("Assigned {} to {} readers.", allSplit, (Object)numReaders);
        log.info("Calculated splits successfully, the size of splits is {}.", (Object)allSplit.size());
    }

    private void addSplitChangeToPendingAssignments(Collection<MaxcomputeSourceSplit> newSplits) {
        for (MaxcomputeSourceSplit split2 : newSplits) {
            int ownerReader = split2.getSplitId() % this.enumeratorContext.currentParallelism();
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet()).add(split2);
        }
    }

    private void assignPendingSplits() {
        Iterator iterator2 = this.enumeratorContext.registeredReaders().iterator();
        while (iterator2.hasNext()) {
            int pendingReader = (Integer)iterator2.next();
            Set<MaxcomputeSourceSplit> pendingAssignmentForReader = this.pendingSplits.remove(pendingReader);
            if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
                this.assignedSplits.addAll(pendingAssignmentForReader);
                log.info("Assigning splits to readers {} {}", (Object)pendingReader, pendingAssignmentForReader);
                this.enumeratorContext.assignSplit(pendingReader, new ArrayList<MaxcomputeSourceSplit>(pendingAssignmentForReader));
            }
            this.enumeratorContext.signalNoMoreSplits(pendingReader);
        }
    }
}

