/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.RecordWithSplitId;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSourceReader<T>
implements SourceReader<T, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
    protected final SourceReader.Context context;
    protected final PulsarClientConfig clientConfig;
    protected final PulsarConsumerConfig consumerConfig;
    protected final StartCursor startCursor;
    protected final Handover<RecordWithSplitId> handover;
    protected final Map<String, PulsarPartitionSplit> splitStates;
    protected final Map<String, PulsarSplitReaderThread> splitReaders;
    protected final SortedMap<Long, Map<String, MessageId>> pendingCursorsToCommit;
    protected final Map<String, MessageId> pendingCursorsToFinish;
    protected final Set<String> finishedSplits;
    protected final DeserializationSchema<T> deserialization;
    protected final int pollTimeout;
    protected final long pollInterval;
    protected final int batchSize;
    protected PulsarClient pulsarClient;
    private boolean noMoreSplitsAssignment = false;

    public PulsarSourceReader(SourceReader.Context context, PulsarClientConfig clientConfig, PulsarConsumerConfig consumerConfig, StartCursor startCursor, DeserializationSchema<T> deserialization, int pollTimeout, long pollInterval, int batchSize) {
        this.context = context;
        this.clientConfig = clientConfig;
        this.consumerConfig = consumerConfig;
        this.startCursor = startCursor;
        this.deserialization = deserialization;
        this.pollTimeout = pollTimeout;
        this.pollInterval = pollInterval;
        this.batchSize = batchSize;
        this.splitStates = new HashMap<String, PulsarPartitionSplit>();
        this.splitReaders = new HashMap<String, PulsarSplitReaderThread>();
        this.pendingCursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.pendingCursorsToFinish = Collections.synchronizedSortedMap(new TreeMap());
        this.finishedSplits = new TreeSet<String>();
        this.handover = new Handover();
    }

    public void open() {
        this.pulsarClient = PulsarConfigUtil.createClient(this.clientConfig);
    }

    public void close() throws IOException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        for (PulsarSplitReaderThread pulsarSplitReaderThread : this.splitReaders.values()) {
            try {
                pulsarSplitReaderThread.close();
            }
            catch (IOException e) {
                throw new PulsarConnectorException((SeaTunnelErrorCode)CommonErrorCode.READER_OPERATION_FAILED, "Failed to close the split reader thread.", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<T> output) throws Exception {
        for (int i = 0; i < this.batchSize; ++i) {
            Optional recordWithSplitId = this.handover.pollNext();
            if (recordWithSplitId.isPresent()) {
                String splitId = ((RecordWithSplitId)recordWithSplitId.get()).getSplitId();
                Message<byte[]> message = ((RecordWithSplitId)recordWithSplitId.get()).getMessage();
                Object object = output.getCheckpointLock();
                synchronized (object) {
                    this.splitStates.get(splitId).setLatestConsumedId(message.getMessageId());
                    this.deserialization.deserialize(message.getData(), output);
                }
            }
            if (!this.noMoreSplitsAssignment || this.finishedSplits.size() != this.splitStates.size()) continue;
            this.context.signalNoMoreElement();
            break;
        }
    }

    public List<PulsarPartitionSplit> snapshotState(long checkpointId) throws Exception {
        List<PulsarPartitionSplit> pendingSplit = this.splitStates.values().stream().map(PulsarPartitionSplit::copy).collect(Collectors.toList());
        int size = pendingSplit.size();
        Map cursors = this.pendingCursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap(size));
        for (PulsarPartitionSplit split : pendingSplit) {
            MessageId latestConsumedId = split.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            cursors.put(split.splitId(), latestConsumedId);
        }
        return pendingSplit;
    }

    public void addSplits(List<PulsarPartitionSplit> splits) {
        for (PulsarPartitionSplit split : splits) {
            this.splitStates.put(split.splitId(), split);
            PulsarSplitReaderThread splitReaderThread = this.createPulsarSplitReaderThread(split);
            try {
                splitReaderThread.setName("Pulsar Source Data Consumer " + split.getPartition().getPartition());
                splitReaderThread.open();
                this.splitReaders.put(split.splitId(), splitReaderThread);
                splitReaderThread.start();
                LOG.info("PulsarSplitReaderThread = {} start", (Object)splitReaderThread.getName());
            }
            catch (PulsarClientException e) {
                throw new PulsarConnectorException((SeaTunnelErrorCode)CommonErrorCode.READER_OPERATION_FAILED, "Failed to start the split reader thread.", e);
            }
        }
    }

    protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit split) {
        return new PulsarSplitReaderThread(this, split, this.pulsarClient, this.consumerConfig, this.pollTimeout, this.pollInterval, this.startCursor, this.handover);
    }

    public void handleNoMoreElements(String splitId, MessageId messageId) {
        LOG.info("Reader received the split {} NoMoreElements event.", (Object)splitId);
        this.pendingCursorsToFinish.put(splitId, messageId);
        if (this.context.getBoundedness() == Boundedness.BOUNDED) {
            this.finishedSplits.add(splitId);
        }
    }

    public void handleNoMoreSplits() {
        LOG.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing cursors for checkpoint {}", (Object)checkpointId);
        Map pendingCursors = (Map)this.pendingCursorsToCommit.remove(checkpointId);
        if (pendingCursors == null) {
            LOG.debug("Cursors for checkpoint {} either do not exist or have already been committed.", (Object)checkpointId);
            return;
        }
        pendingCursors.forEach(this::committingCursor);
    }

    private void committingCursor(String splitId, MessageId messageId) {
        block5: {
            if (this.finishedSplits.contains(splitId)) {
                return;
            }
            try {
                PulsarSplitReaderThread pulsarSplitReaderThread = this.splitReaders.get(splitId);
                pulsarSplitReaderThread.committingCursor(messageId);
                if (!this.pendingCursorsToFinish.containsKey(splitId) || this.pendingCursorsToFinish.get(splitId).compareTo(messageId) != 0) break block5;
                this.finishedSplits.add(splitId);
                try {
                    pulsarSplitReaderThread.close();
                }
                catch (IOException e) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)CommonErrorCode.READER_OPERATION_FAILED, "Failed to close the split reader thread.", e);
                }
            }
            catch (PulsarClientException e) {
                throw new PulsarConnectorException(PulsarConnectorErrorCode.ACK_CUMULATE_FAILED, "pulsar consumer acknowledgeCumulative failed.", e);
            }
        }
    }
}

