/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

class SynchronousReceiveWork {
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000L);
    private final ClientLogger logger = new ClientLogger(SynchronousReceiveWork.class);
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final Duration timeout;
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Sinks.Many<ServiceBusReceivedMessage> downstreamEmitter;
    private final Disposable.Composite timeoutSubscriptions;
    private final AtomicBoolean isTerminal = new AtomicBoolean();

    SynchronousReceiveWork(long id, int numberToReceive, Duration timeout, Sinks.Many<ServiceBusReceivedMessage> emitter) {
        this.id = id;
        this.remaining = new AtomicInteger(numberToReceive);
        this.numberToReceive = numberToReceive;
        this.timeout = timeout;
        this.downstreamEmitter = emitter;
        this.timeoutSubscriptions = Disposables.composite();
    }

    long getId() {
        return this.id;
    }

    Duration getTimeout() {
        return this.timeout;
    }

    int getNumberOfEvents() {
        return this.numberToReceive;
    }

    int getRemainingEvents() {
        return this.remaining.get();
    }

    synchronized void start() {
        if (this.isStarted.getAndSet(true)) {
            return;
        }
        this.timeoutSubscriptions.add(Mono.delay((Duration)this.timeout).subscribe(index -> this.complete("Timeout elapsed for work."), error -> this.complete("Error occurred while waiting for timeout.", (Throwable)error)));
        this.timeoutSubscriptions.add(Flux.switchOnNext((Publisher)this.downstreamEmitter.asFlux().map(messageContext -> Mono.delay((Duration)TIMEOUT_BETWEEN_MESSAGES))).subscribe(delayElapsed -> this.complete("Timeout between the messages occurred. Completing the work."), error -> this.complete("Error occurred while waiting for timeout between messages.", (Throwable)error)));
    }

    synchronized boolean isTerminal() {
        return this.isTerminal.get();
    }

    synchronized boolean emitNext(ServiceBusReceivedMessage message) {
        int numberLeft;
        if (this.isTerminal.get()) {
            return false;
        }
        if (!this.isStarted.get()) {
            this.start();
        }
        if ((numberLeft = this.remaining.decrementAndGet()) < 0) {
            this.logger.info("workId[{}] Number left {} < 0. Not emitting downstream.", new Object[]{this.id, numberLeft});
            return false;
        }
        Sinks.EmitResult result = this.downstreamEmitter.tryEmitNext((Object)message);
        if (result != Sinks.EmitResult.OK) {
            this.logger.info("workId[{}] Could not emit downstream. EmitResult: {}", new Object[]{this.id, result});
            return false;
        }
        if (numberLeft == 0) {
            this.complete(null);
        }
        return true;
    }

    void complete(String message) {
        this.complete(message, null);
    }

    void complete(String message, Throwable error) {
        if (this.isTerminal.getAndSet(true)) {
            return;
        }
        if (message != null) {
            if (error == null) {
                this.logger.verbose("workId[{}] {}", new Object[]{this.id, message});
            } else {
                this.logger.warning("workId[{}] {}", new Object[]{this.id, message, error});
            }
        }
        try {
            this.timeoutSubscriptions.dispose();
        }
        finally {
            if (error == null) {
                this.downstreamEmitter.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            } else {
                this.downstreamEmitter.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST);
            }
        }
    }
}

