/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpExceptionHandler;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.scheduler.Scheduler;

class ReactorExecutor
implements Closeable {
    private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";
    private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
    private final AtomicBoolean hasStarted = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Object lock = new Object();
    private final Reactor reactor;
    private final Scheduler scheduler;
    private final String connectionId;
    private final Duration timeout;
    private final AmqpExceptionHandler exceptionHandler;
    private final String hostname;

    ReactorExecutor(Reactor reactor, Scheduler scheduler, String connectionId, AmqpExceptionHandler exceptionHandler, Duration timeout, String hostname) {
        Objects.requireNonNull(reactor);
        Objects.requireNonNull(scheduler);
        Objects.requireNonNull(connectionId);
        Objects.requireNonNull(exceptionHandler);
        Objects.requireNonNull(timeout);
        this.reactor = reactor;
        this.scheduler = scheduler;
        this.connectionId = connectionId;
        this.timeout = timeout;
        this.exceptionHandler = exceptionHandler;
        this.hostname = hostname;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void start() {
        if (this.hasStarted.get()) {
            this.logger.warning("ReactorExecutor has already started.", new Object[0]);
            return;
        }
        this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Starting reactor."});
        this.hasStarted.set(true);
        Object object = this.lock;
        synchronized (object) {
            this.reactor.start();
        }
        this.scheduler.schedule(this::run);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() {
        block16: {
            if (!this.isDisposed.get() && !this.hasStarted.get()) {
                this.logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.", new Object[0]);
                return;
            }
            boolean rescheduledReactor = false;
            try {
                boolean shouldReschedule;
                Object object = this.lock;
                synchronized (object) {
                    shouldReschedule = this.hasStarted.get() && !Thread.interrupted() && this.reactor.process();
                }
                if (shouldReschedule) {
                    try {
                        this.scheduler.schedule(this::run);
                        rescheduledReactor = true;
                    }
                    catch (RejectedExecutionException exception) {
                        this.logger.warning(LOG_MESSAGE, new Object[]{this.connectionId, StringUtil.toStackTraceString(exception, "Scheduling reactor failed because the executor has been shut down")});
                        this.reactor.attachments().set(RejectedExecutionException.class, RejectedExecutionException.class, (Object)exception);
                    }
                }
                if (rescheduledReactor) break block16;
                if (this.hasStarted.get()) {
                    this.scheduleCompletePendingTasks();
                    break block16;
                }
                String reason = "Stopping the reactor because thread was interrupted or the reactor has no more events to process.";
            }
            catch (HandlerException handlerException) {
                try {
                    Throwable cause = handlerException.getCause() == null ? handlerException : handlerException.getCause();
                    this.logger.warning(LOG_MESSAGE, new Object[]{this.connectionId, StringUtil.toStackTraceString(handlerException, "Unhandled exception while processing events in reactor, report this error.")});
                    String message = !CoreUtils.isNullOrEmpty((CharSequence)cause.getMessage()) ? cause.getMessage() : (!CoreUtils.isNullOrEmpty((CharSequence)handlerException.getMessage()) ? handlerException.getMessage() : "Reactor encountered unrecoverable error");
                    AmqpErrorContext errorContext = new AmqpErrorContext(this.hostname);
                    AmqpException exception = cause instanceof UnresolvedAddressException ? new AmqpException(true, String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Check correctness of namespace information. %s", message, StringUtil.getTrackingIDAndTimeToLog()), cause, errorContext) : new AmqpException(true, String.format(Locale.US, "%s, %s", message, StringUtil.getTrackingIDAndTimeToLog()), cause, errorContext);
                    this.exceptionHandler.onConnectionError((Throwable)((Object)exception));
                    if (rescheduledReactor) break block16;
                    if (this.hasStarted.get()) {
                        this.scheduleCompletePendingTasks();
                    }
                    String reason = "Stopping the reactor because thread was interrupted or the reactor has no more events to process.";
                }
                catch (Throwable throwable) {
                    if (!rescheduledReactor) {
                        if (this.hasStarted.get()) {
                            this.scheduleCompletePendingTasks();
                        } else {
                            String reason = "Stopping the reactor because thread was interrupted or the reactor has no more events to process.";
                            this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process."});
                            this.close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                        }
                    }
                    throw throwable;
                }
                this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process."});
                this.close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
            }
            this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process."});
            this.close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
        }
    }

    private void scheduleCompletePendingTasks() {
        this.hasStarted.set(false);
        this.scheduler.schedule(() -> {
            this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Processing all pending tasks and closing old reactor."});
            try {
                this.reactor.stop();
                this.reactor.process();
            }
            catch (HandlerException e) {
                this.logger.warning(LOG_MESSAGE, new Object[]{this.connectionId, StringUtil.toStackTraceString(e, "scheduleCompletePendingTasks - exception occurred while processing events.")});
            }
            finally {
                this.reactor.free();
            }
        }, this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        if (!this.isDisposed.getAndSet(true)) {
            this.close(true, "ReactorExecutor.close() was called.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(boolean isUserInitialized, String reason) {
        if (this.hasStarted.getAndSet(false)) {
            this.logger.info(LOG_MESSAGE, new Object[]{this.connectionId, "Stopping the reactor."});
            Object object = this.lock;
            synchronized (object) {
                this.reactor.stop();
                this.reactor.free();
            }
            this.exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, isUserInitialized, reason));
        }
    }
}

