/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSplitEnumerator
implements SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<PulsarPartitionSplit> context;
    private final PulsarAdminConfig adminConfig;
    private final PulsarDiscoverer partitionDiscoverer;
    private final long partitionDiscoveryIntervalMs;
    private final StartCursor startCursor;
    private final StopCursor stopCursor;
    private final String subscriptionName;
    private final Set<TopicPartition> assignedPartitions;
    private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
    private PulsarAdmin pulsarAdmin;
    private boolean noMoreNewPartitionSplits = false;
    private ScheduledThreadPoolExecutor executor = null;

    public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context, PulsarAdminConfig adminConfig, PulsarDiscoverer partitionDiscoverer, long partitionDiscoveryIntervalMs, StartCursor startCursor, StopCursor stopCursor, String subscriptionName) {
        this(context, adminConfig, partitionDiscoverer, partitionDiscoveryIntervalMs, startCursor, stopCursor, subscriptionName, Collections.emptySet());
    }

    public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context, PulsarAdminConfig adminConfig, PulsarDiscoverer partitionDiscoverer, long partitionDiscoveryIntervalMs, StartCursor startCursor, StopCursor stopCursor, String subscriptionName, Set<TopicPartition> assignedPartitions) {
        if (partitionDiscoverer instanceof TopicPatternDiscoverer && partitionDiscoveryIntervalMs > 0L && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_OPERATION, "Bounded streams do not support dynamic partition discovery.");
        }
        this.context = context;
        this.adminConfig = adminConfig;
        this.partitionDiscoverer = partitionDiscoverer;
        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
        this.startCursor = startCursor;
        this.stopCursor = stopCursor;
        this.subscriptionName = subscriptionName;
        this.assignedPartitions = new HashSet<TopicPartition>(assignedPartitions);
        this.pendingPartitionSplits = new HashMap<Integer, Set<PulsarPartitionSplit>>();
    }

    public void open() {
        this.pulsarAdmin = PulsarConfigUtil.createAdmin(this.adminConfig);
    }

    public void run() throws Exception {
        if (this.partitionDiscoveryIntervalMs > 0L) {
            this.executor = new ScheduledThreadPoolExecutor(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("pulsar-split-discovery-executor");
                return thread;
            });
            this.executor.scheduleAtFixedRate(this::discoverySplits, 0L, this.partitionDiscoveryIntervalMs, TimeUnit.MILLISECONDS);
        } else {
            this.discoverySplits();
        }
    }

    private void discoverySplits() {
        Set<TopicPartition> subscribedTopicPartitions = this.partitionDiscoverer.getSubscribedTopicPartitions(this.pulsarAdmin);
        this.checkPartitionChanges(subscribedTopicPartitions);
    }

    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions) {
        Set<TopicPartition> newPartitions = this.getNewPartitions(fetchedPartitions);
        if (this.partitionDiscoveryIntervalMs <= 0L && !this.noMoreNewPartitionSplits) {
            LOG.debug("Partition discovery is disabled.");
            this.noMoreNewPartitionSplits = true;
        }
        if (newPartitions.isEmpty()) {
            return;
        }
        List<PulsarPartitionSplit> newSplits = newPartitions.stream().map(this::createPulsarPartitionSplit).collect(Collectors.toList());
        this.addPartitionSplitChangeToPendingAssignments(newSplits);
        this.assignPendingPartitionSplits(this.context.registeredReaders());
    }

    private PulsarPartitionSplit createPulsarPartitionSplit(TopicPartition partition) {
        StopCursor partitionStopCursor = this.stopCursor.copy();
        PulsarPartitionSplit split = new PulsarPartitionSplit(partition, partitionStopCursor);
        if (partitionStopCursor instanceof LatestMessageStopCursor) {
            ((LatestMessageStopCursor)partitionStopCursor).prepare(this.pulsarAdmin, partition);
        }
        if (this.startCursor instanceof SubscriptionStartCursor) {
            ((SubscriptionStartCursor)this.startCursor).ensureSubscription(this.subscriptionName, partition, this.pulsarAdmin);
        }
        return split;
    }

    private Set<TopicPartition> getNewPartitions(Set<TopicPartition> fetchedPartitions) {
        Consumer<TopicPartition> duplicateOrMarkAsRemoved = fetchedPartitions::remove;
        this.assignedPartitions.forEach(duplicateOrMarkAsRemoved);
        this.pendingPartitionSplits.forEach((reader, splits) -> splits.forEach(split -> duplicateOrMarkAsRemoved.accept(split.getPartition())));
        if (!fetchedPartitions.isEmpty()) {
            LOG.info("Discovered new partitions: {}", fetchedPartitions);
        }
        return fetchedPartitions;
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<PulsarPartitionSplit> newPartitionSplits) {
        int numReaders = this.context.currentParallelism();
        for (PulsarPartitionSplit split : newPartitionSplits) {
            int ownerReader = PulsarSplitEnumerator.getSplitOwner(split.getPartition(), numReaders);
            this.pendingPartitionSplits.computeIfAbsent(ownerReader, r -> new HashSet()).add(split);
        }
        LOG.debug("Assigned {} to {} readers of subscription {}.", new Object[]{newPartitionSplits, numReaders, this.subscriptionName});
    }

    static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = (tp.getTopic().hashCode() * 31 & Integer.MAX_VALUE) % numReaders;
        return (startIndex + tp.getPartition()) % numReaders;
    }

    private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
        for (int pendingReader : pendingReaders) {
            Set<PulsarPartitionSplit> pendingAssignmentForReader = this.pendingPartitionSplits.remove(pendingReader);
            if (pendingAssignmentForReader == null || pendingAssignmentForReader.isEmpty()) continue;
            pendingAssignmentForReader.forEach(split -> this.assignedPartitions.add(split.getPartition()));
            LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
            this.context.assignSplit(pendingReader, new ArrayList<PulsarPartitionSplit>(pendingAssignmentForReader));
        }
        if (this.noMoreNewPartitionSplits && this.stopCursor.getBoundedness() == Boundedness.BOUNDED) {
            LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", pendingReaders, (Object)this.subscriptionName);
            pendingReaders.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
        }
    }

    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
        this.addPartitionSplitChangeToPendingAssignments(splits);
        if (this.context.registeredReaders().contains(subtaskId)) {
            this.assignPendingPartitionSplits(Collections.singleton(subtaskId));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingPartitionSplits.size();
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", (Object)subtaskId, (Object)this.subscriptionName);
        this.assignPendingPartitionSplits(Collections.singleton(subtaskId));
    }

    public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
        return new PulsarSplitEnumeratorState(this.assignedPartitions);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

