/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.events;

import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Threads;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class BufferingChangeStreamCursor<TResult>
implements MongoChangeStreamCursor<ResumableChangeStreamEvent<TResult>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferingChangeStreamCursor.class);
    public static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5;
    public static final int FETCHER_SHUTDOWN_TIMEOUT = 30;
    private final EventFetcher<TResult> fetcher;
    private final ExecutorService executor;
    private final DelayStrategy throttler;
    private BsonDocument lastResumeToken = null;

    public static <TResult> BufferingChangeStreamCursor<TResult> fromIterable(ChangeStreamIterable<TResult> stream, MongoDbTaskContext taskContext, MongoDbStreamingChangeEventSourceMetrics metrics, Clock clock) {
        MongoDbConnectorConfig config = taskContext.getConnectorConfig();
        return new BufferingChangeStreamCursor<TResult>(new EventFetcher<TResult>(stream, config.getMaxBatchSize(), metrics, clock, config.getPollInterval()), Threads.newFixedThreadPool(MongoDbConnector.class, (String)taskContext.getServerName(), (String)"replicator-fetcher", (int)1), config.getPollInterval());
    }

    public BufferingChangeStreamCursor(EventFetcher<TResult> fetcher, ExecutorService executor, DelayStrategy throttler) {
        this.fetcher = fetcher;
        this.executor = executor;
        this.throttler = throttler;
    }

    public BufferingChangeStreamCursor(EventFetcher<TResult> fetcher, ExecutorService executor, Duration throttleMaxSleep) {
        this(fetcher, executor, DelayStrategy.boundedExponential((Duration)Duration.ofMillis(1L), (Duration)throttleMaxSleep, (double)2.0));
    }

    public BufferingChangeStreamCursor<TResult> start() {
        LOGGER.info("Fetcher submitted for execution: {} @ {}", this.fetcher, (Object)this.executor);
        this.executor.submit(this.fetcher);
        return this;
    }

    public ResumableChangeStreamEvent<TResult> tryNext() {
        ResumableChangeStreamEvent<TResult> event = this.pollWithDelay();
        if (event != null) {
            this.lastResumeToken = event.resumeToken;
        }
        return event;
    }

    public ResumableChangeStreamEvent<TResult> next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        return this.tryNext();
    }

    private ResumableChangeStreamEvent<TResult> pollWithDelay() {
        ResumableChangeStreamEvent<TResult> event;
        boolean slept;
        while (slept = this.throttler.sleepWhen((event = this.fetcher.poll()) == null)) {
        }
        return event;
    }

    public boolean hasNext() {
        return !this.fetcher.isEmpty();
    }

    public int available() {
        return this.fetcher.size();
    }

    public BsonDocument getResumeToken() {
        return this.lastResumeToken;
    }

    public ServerCursor getServerCursor() {
        return this.fetcher.cursorRef.get().getServerCursor();
    }

    public ServerAddress getServerAddress() {
        return this.fetcher.cursorRef.get().getServerAddress();
    }

    public void close() {
        this.fetcher.close();
        this.executor.shutdown();
        try {
            LOGGER.info("Awaiting fetcher thread termination");
            this.executor.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for fetcher thread shutdown");
        }
    }

    public static final class EventFetcher<TResult>
    implements Runnable,
    Closeable {
        public static final long QUEUE_OFFER_TIMEOUT_MS = 100L;
        private final ChangeStreamIterable<TResult> stream;
        private final Semaphore capacity;
        private final Queue<ResumableChangeStreamEvent<TResult>> queue;
        private final DelayStrategy throttler;
        private final AtomicBoolean running;
        private final AtomicReference<MongoChangeStreamCursor<ChangeStreamDocument<TResult>>> cursorRef;
        private final AtomicReference<Throwable> error;
        private final MongoDbStreamingChangeEventSourceMetrics metrics;
        private final Clock clock;
        private int noMessageIterations = 0;

        public EventFetcher(ChangeStreamIterable<TResult> stream, int capacity, MongoDbStreamingChangeEventSourceMetrics metrics, Clock clock, DelayStrategy throttler) {
            this.stream = stream;
            this.capacity = new Semaphore(capacity);
            this.metrics = metrics;
            this.clock = clock;
            this.throttler = throttler;
            this.running = new AtomicBoolean(false);
            this.cursorRef = new AtomicReference<Object>(null);
            this.queue = new ConcurrentLinkedQueue<ResumableChangeStreamEvent<TResult>>();
            this.error = new AtomicReference<Object>(null);
        }

        public EventFetcher(ChangeStreamIterable<TResult> stream, int capacity, MongoDbStreamingChangeEventSourceMetrics metrics, Clock clock, Duration throttleMaxSleep) {
            this(stream, capacity, metrics, clock, DelayStrategy.constant((Duration)throttleMaxSleep));
        }

        public boolean isRunning() {
            return this.running.get();
        }

        public boolean hasError() {
            return this.error.get() != null;
        }

        public Throwable getError() {
            return this.error.get();
        }

        @Override
        public void close() {
            this.running.set(false);
        }

        public ResumableChangeStreamEvent<TResult> poll() {
            ResumableChangeStreamEvent<TResult> event = this.queue.poll();
            if (event == null) {
                if (this.hasError()) {
                    throw new DebeziumException("Unable to fetch change stream events", this.getError());
                }
            } else {
                this.capacity.release();
            }
            return event;
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public int size() {
            return this.queue.size();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try (MongoChangeStreamCursor cursor = this.stream.cursor();){
                this.cursorRef.compareAndSet(null, cursor);
                this.running.set(true);
                this.noMessageIterations = 0;
                this.fetchEvents(cursor);
            }
            catch (InterruptedException e) {
                LOGGER.error("Fetcher thread interrupted", (Throwable)e);
                throw new DebeziumException("Fetcher thread interrupted", (Throwable)e);
            }
            catch (Throwable e) {
                this.error.set(e);
                LOGGER.error("Fetcher thread has failed", e);
            }
            finally {
                this.cursorRef.set(null);
                this.close();
            }
        }

        private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) throws InterruptedException {
            ResumableChangeStreamEvent<TResult> lastEvent = null;
            boolean repeat = false;
            while (this.isRunning()) {
                if (!repeat) {
                    Optional<ResumableChangeStreamEvent<TResult>> maybeEvent = this.fetchEvent(cursor);
                    if (maybeEvent.isEmpty()) {
                        LOGGER.warn("Resume token not available on this poll");
                        continue;
                    }
                    lastEvent = maybeEvent.get();
                }
                repeat = !this.enqueue(lastEvent);
            }
        }

        private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) {
            Instant beforeEventPollTime = this.clock.currentTimeAsInstant();
            ChangeStreamDocument document = (ChangeStreamDocument)cursor.tryNext();
            this.metrics.onSourceEventPolled(document, this.clock, beforeEventPollTime);
            this.throttleIfNeeded(document);
            return Optional.empty().or(() -> Optional.ofNullable(document).map(ResumableChangeStreamEvent::new)).or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new));
        }

        private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
            if (document == null) {
                ++this.noMessageIterations;
            }
            if (this.noMessageIterations >= 5) {
                LOGGER.debug("Sleeping after {} empty polls", (Object)this.noMessageIterations);
                this.throttler.sleepWhen(true);
                this.noMessageIterations = 0;
            }
        }

        private boolean enqueue(ResumableChangeStreamEvent<TResult> event) throws InterruptedException {
            boolean available = this.capacity.tryAcquire(100L, TimeUnit.MILLISECONDS);
            if (!available) {
                LOGGER.warn("Unable to acquire buffer lock, buffer queue is likely full");
                return false;
            }
            return this.queue.offer(event);
        }
    }

    @Immutable
    public static final class ResumableChangeStreamEvent<TResult> {
        public final Optional<ChangeStreamDocument<TResult>> document;
        public final BsonDocument resumeToken;

        public ResumableChangeStreamEvent(ChangeStreamDocument<TResult> document) {
            Objects.requireNonNull(document);
            this.document = Optional.of(document);
            this.resumeToken = document.getResumeToken();
        }

        public ResumableChangeStreamEvent(BsonDocument resumeToken) {
            Objects.requireNonNull(resumeToken);
            this.document = Optional.empty();
            this.resumeToken = resumeToken;
        }

        public boolean isEmpty() {
            return this.document.isEmpty();
        }

        public boolean hasDocument() {
            return this.document.isPresent();
        }

        public String toString() {
            return this.document.map(ChangeStreamDocument::toString).orElseGet(() -> ((BsonDocument)this.resumeToken).toString());
        }
    }
}

