/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.qpid.protonj2.engine.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonBuffer;
import com.rabbitmq.qpid.protonj2.engine.AMQPPerformativeEnvelopePool;
import com.rabbitmq.qpid.protonj2.engine.ConnectionState;
import com.rabbitmq.qpid.protonj2.engine.Engine;
import com.rabbitmq.qpid.protonj2.engine.EnginePipeline;
import com.rabbitmq.qpid.protonj2.engine.EngineSaslDriver;
import com.rabbitmq.qpid.protonj2.engine.EngineState;
import com.rabbitmq.qpid.protonj2.engine.EventHandler;
import com.rabbitmq.qpid.protonj2.engine.HeaderEnvelope;
import com.rabbitmq.qpid.protonj2.engine.OutgoingAMQPEnvelope;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineFailedException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineNotStartedException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineNotWritableException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineShutdownException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineStartedException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineStateException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.IdleTimeoutException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.ProtonExceptionSupport;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonConnection;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEngineConfiguration;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEngineNoOpSaslDriver;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEnginePipeline;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEnginePipelineProxy;
import com.rabbitmq.qpid.protonj2.logging.ProtonLogger;
import com.rabbitmq.qpid.protonj2.logging.ProtonLoggerFactory;
import com.rabbitmq.qpid.protonj2.types.Symbol;
import com.rabbitmq.qpid.protonj2.types.transport.ErrorCondition;
import com.rabbitmq.qpid.protonj2.types.transport.Performative;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

public class ProtonEngine
implements Engine {
    private static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(ProtonEngine.class);
    private static final byte[] EMPTY_FRAME_BUFFER = new byte[]{0, 0, 0, 8, 2, 0, 0, 0};
    private final ProtonEnginePipeline pipeline = new ProtonEnginePipeline(this);
    private final ProtonEnginePipelineProxy pipelineProxy = new ProtonEnginePipelineProxy(this.pipeline);
    private final ProtonEngineConfiguration configuration = new ProtonEngineConfiguration(this);
    private final ProtonConnection connection = new ProtonConnection(this);
    private final AMQPPerformativeEnvelopePool<OutgoingAMQPEnvelope> framePool = AMQPPerformativeEnvelopePool.outgoingEnvelopePool();
    private EngineSaslDriver saslDriver = new ProtonEngineNoOpSaslDriver();
    private boolean writable;
    private EngineState state = EngineState.IDLE;
    private Throwable failureCause;
    private int inputSequence;
    private int outputSequence;
    private Future<?> nextIdleTimeoutCheck;
    private Scheduler idleTimeoutExecutor;
    private int lastInputSequence;
    private int lastOutputSequence;
    private long localIdleDeadline = 0L;
    private long remoteIdleDeadline = 0L;
    private BiConsumer<ProtonBuffer, Runnable> outputHandler;
    private EventHandler<Engine> engineShutdownHandler;
    private EventHandler<Engine> engineFailureHandler = engine -> LOG.warn("Engine encountered error and will become inoperable: ", (Object)engine.failureCause());

    @Override
    public ProtonConnection connection() {
        return this.connection;
    }

    @Override
    public boolean isWritable() {
        return this.writable;
    }

    @Override
    public boolean isRunning() {
        return this.state == EngineState.STARTED;
    }

    @Override
    public boolean isShutdown() {
        return this.state.ordinal() >= EngineState.SHUTDOWN.ordinal();
    }

    @Override
    public boolean isFailed() {
        return this.failureCause != null;
    }

    @Override
    public Throwable failureCause() {
        return this.failureCause;
    }

    @Override
    public EngineState state() {
        return this.state;
    }

    @Override
    public ProtonConnection start() throws EngineStateException {
        this.checkShutdownOrFailed("Cannot start an Engine that has already been shutdown or has failed.");
        if (this.state == EngineState.IDLE) {
            this.state = EngineState.STARTING;
            try {
                this.pipeline.fireEngineStarting();
                this.state = EngineState.STARTED;
                this.writable = true;
                this.connection.handleEngineStarted(this);
            }
            catch (Throwable error) {
                throw this.engineFailed(error);
            }
        }
        return this.connection;
    }

    @Override
    public ProtonEngine shutdown() {
        if (this.state.ordinal() < EngineState.SHUTTING_DOWN.ordinal()) {
            this.state = EngineState.SHUTDOWN;
            this.writable = false;
            if (this.nextIdleTimeoutCheck != null) {
                LOG.trace("Canceling scheduled Idle Timeout Check");
                this.nextIdleTimeoutCheck.cancel(false);
                this.nextIdleTimeoutCheck = null;
            }
            try {
                this.pipeline.fireEngineStateChanged();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                this.connection.handleEngineShutdown(this);
            }
            catch (Exception exception) {
            }
            finally {
                if (this.engineShutdownHandler != null) {
                    this.engineShutdownHandler.handle(this);
                }
            }
        }
        return this;
    }

    @Override
    public long tick(long currentTime) throws IllegalStateException, EngineStateException {
        this.checkShutdownOrFailed("Cannot tick an Engine that has been shutdown or failed.");
        if (this.connection.getState() != ConnectionState.ACTIVE) {
            throw new IllegalStateException("Cannot tick on a Connection that is not opened or an engine that has been shut down.");
        }
        if (this.idleTimeoutExecutor != null) {
            throw new IllegalStateException("Automatic ticking previously initiated.");
        }
        this.performReadCheck(currentTime);
        this.performWriteCheck(currentTime);
        return ProtonEngine.nextTickDeadline(this.localIdleDeadline, this.remoteIdleDeadline);
    }

    @Override
    public ProtonEngine tickAuto(final ScheduledExecutorService executor) throws IllegalStateException, EngineStateException {
        return this.tickAuto(new Scheduler(){
            private final ScheduledExecutorService service;
            {
                this.service = executor;
            }

            @Override
            public boolean isShutdown() {
                return this.service.isShutdown();
            }

            @Override
            public void execute(Runnable command) {
                this.service.execute(command);
            }

            @Override
            public Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
                return this.service.schedule(command, delay, unit);
            }

            @Override
            public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
                return this.service.schedule(task, delay, unit);
            }

            @Override
            public Future<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
                return this.service.scheduleAtFixedRate(command, initialDelay, period, unit);
            }

            @Override
            public Future<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
                return this.service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
            }
        });
    }

    @Override
    public ProtonEngine tickAuto(Scheduler executor) throws IllegalStateException, EngineStateException {
        this.checkShutdownOrFailed("Cannot start auto tick on an Engine that has been shutdown or failed");
        Objects.requireNonNull(executor);
        if (this.connection.getState() != ConnectionState.ACTIVE) {
            throw new IllegalStateException("Cannot tick on a Connection that is not opened.");
        }
        if (this.idleTimeoutExecutor != null) {
            throw new IllegalStateException("Automatic ticking previously initiated.");
        }
        LOG.trace("Auto Idle Timeout Check being initiated");
        this.idleTimeoutExecutor = executor;
        this.idleTimeoutExecutor.execute(new IdleTimeoutCheck());
        return this;
    }

    @Override
    public ProtonEngine ingest(ProtonBuffer input) throws EngineStateException {
        this.checkShutdownOrFailed("Cannot ingest data into an Engine that has been shutdown or failed");
        if (!this.isWritable()) {
            throw new EngineNotWritableException("Engine is currently not accepting new input");
        }
        if (input.isReadable()) {
            try {
                this.pipeline.fireRead(input);
            }
            catch (Exception error) {
                throw this.engineFailed(error);
            }
            finally {
                ++this.inputSequence;
            }
        }
        return this;
    }

    @Override
    public EngineStateException engineFailed(Throwable cause) {
        EngineStateException failure;
        if (this.state.ordinal() < EngineState.SHUTTING_DOWN.ordinal() && this.state != EngineState.FAILED) {
            this.state = EngineState.FAILED;
            this.failureCause = cause;
            this.writable = false;
            if (this.nextIdleTimeoutCheck != null) {
                LOG.trace("Canceling scheduled Idle Timeout Check");
                this.nextIdleTimeoutCheck.cancel(false);
                this.nextIdleTimeoutCheck = null;
            }
            failure = ProtonExceptionSupport.createFailedException(cause);
            try {
                this.pipeline.fireFailed((EngineFailedException)failure);
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                this.connection.handleEngineFailed(this, cause);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.engineFailureHandler.handle(this);
        } else {
            failure = this.isFailed() ? ProtonExceptionSupport.createFailedException(cause) : new EngineShutdownException("Engine has transitioned to shutdown state");
        }
        return failure;
    }

    @Override
    public ProtonEngine outputHandler(BiConsumer<ProtonBuffer, Runnable> handler) {
        this.outputHandler = handler;
        return this;
    }

    BiConsumer<ProtonBuffer, Runnable> outputHandler() {
        return this.outputHandler;
    }

    @Override
    public ProtonEngine errorHandler(EventHandler<Engine> handler) {
        this.engineFailureHandler = handler;
        return this;
    }

    EventHandler<Engine> errorHandler() {
        return this.engineFailureHandler;
    }

    @Override
    public ProtonEngine shutdownHandler(EventHandler<Engine> handler) {
        this.engineShutdownHandler = handler;
        return this;
    }

    EventHandler<Engine> engineShutdownHandler() {
        return this.engineShutdownHandler;
    }

    @Override
    public EnginePipeline pipeline() {
        return this.pipelineProxy;
    }

    @Override
    public ProtonEngineConfiguration configuration() {
        return this.configuration;
    }

    @Override
    public EngineSaslDriver saslDriver() {
        return this.saslDriver;
    }

    public void registerSaslDriver(EngineSaslDriver saslDriver) throws EngineStateException {
        this.checkShutdownOrFailed("Cannot register a SASL driver on an Engine that is shutdown or failed.");
        if (this.state.ordinal() > EngineState.STARTING.ordinal()) {
            throw new EngineStartedException("Cannot alter SASL driver after Engine has been started.");
        }
        this.saslDriver = saslDriver;
    }

    ProtonEngine fireWrite(HeaderEnvelope frame) {
        this.pipeline.fireWrite(frame);
        return this;
    }

    ProtonEngine fireWrite(OutgoingAMQPEnvelope frame) {
        this.pipeline.fireWrite(frame);
        return this;
    }

    ProtonEngine fireWrite(Performative performative, int channel) {
        this.pipeline.fireWrite(this.framePool.take(performative, channel, null));
        return this;
    }

    ProtonEngine fireWrite(Performative performative, int channel, ProtonBuffer payload) {
        this.pipeline.fireWrite(this.framePool.take(performative, channel, payload));
        return this;
    }

    OutgoingAMQPEnvelope wrap(Performative performative, int channel, ProtonBuffer payload) {
        return this.framePool.take(performative, channel, payload);
    }

    void checkEngineNotStarted(String message) {
        if (this.state == EngineState.IDLE) {
            throw new EngineNotStartedException(message);
        }
    }

    void checkFailed(String message) {
        if (this.state == EngineState.FAILED) {
            throw ProtonExceptionSupport.createFailedException(message, this.failureCause);
        }
    }

    void checkShutdownOrFailed(String message) {
        if (this.state.ordinal() > EngineState.STARTED.ordinal()) {
            if (this.isFailed()) {
                throw ProtonExceptionSupport.createFailedException(message, this.failureCause);
            }
            throw new EngineShutdownException(message);
        }
    }

    void dispatchWriteToEventHandler(ProtonBuffer buffer, Runnable ioComplete) {
        if (this.outputHandler != null) {
            ++this.outputSequence;
            try {
                this.outputHandler.accept(buffer, ioComplete);
            }
            catch (Throwable error) {
                throw this.engineFailed(error);
            }
        } else {
            throw this.engineFailed(new IllegalStateException("No output handler configured"));
        }
    }

    private void performReadCheck(long currentTime) {
        long localIdleTimeout = this.connection.getIdleTimeout();
        if (localIdleTimeout > 0L) {
            if (this.localIdleDeadline == 0L || this.lastInputSequence != this.inputSequence) {
                this.localIdleDeadline = this.computeDeadline(currentTime, localIdleTimeout);
                this.lastInputSequence = this.inputSequence;
            } else if (this.localIdleDeadline - currentTime <= 0L) {
                if (this.connection.getState() != ConnectionState.CLOSED) {
                    ErrorCondition condition = new ErrorCondition(Symbol.getSymbol("amqp:resource-limit-exceeded"), "local-idle-timeout expired");
                    this.connection.setCondition(condition);
                    this.connection.close();
                    this.engineFailed(new IdleTimeoutException("Remote idle timeout detected"));
                } else {
                    this.localIdleDeadline = this.computeDeadline(currentTime, localIdleTimeout);
                }
            }
        }
    }

    private void performWriteCheck(long currentTime) {
        long remoteIdleTimeout = this.connection.getRemoteIdleTimeout();
        if (remoteIdleTimeout > 0L && !this.connection.isLocallyClosed()) {
            if (this.remoteIdleDeadline == 0L || this.lastOutputSequence != this.outputSequence) {
                this.remoteIdleDeadline = this.computeDeadline(currentTime, remoteIdleTimeout / 2L);
                this.lastOutputSequence = this.outputSequence;
            } else if (this.remoteIdleDeadline - currentTime <= 0L) {
                this.remoteIdleDeadline = this.computeDeadline(currentTime, remoteIdleTimeout / 2L);
                this.pipeline.fireWrite(this.configuration.getBufferAllocator().copy(EMPTY_FRAME_BUFFER).convertToReadOnly(), null);
                ++this.lastOutputSequence;
            }
        }
    }

    private long computeDeadline(long now, long timeout) {
        long deadline = now + timeout;
        return deadline != 0L ? deadline : 1L;
    }

    private static long nextTickDeadline(long localIdleDeadline, long remoteIdleDeadline) {
        long deadline = localIdleDeadline == 0L ? remoteIdleDeadline : (remoteIdleDeadline == 0L ? localIdleDeadline : (remoteIdleDeadline - localIdleDeadline <= 0L ? remoteIdleDeadline : localIdleDeadline));
        return deadline;
    }

    private final class IdleTimeoutCheck
    implements Runnable {
        private final long MIN_IDLE_CHECK_INTERVAL = 1000L;
        private final long MAX_IDLE_CHECK_INTERVAL = 10000L;

        private IdleTimeoutCheck() {
        }

        @Override
        public void run() {
            boolean checkScheduled = false;
            if (ProtonEngine.this.connection.getState() == ConnectionState.ACTIVE && !ProtonEngine.this.isShutdown()) {
                long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
                try {
                    ProtonEngine.this.performReadCheck(now);
                    ProtonEngine.this.performWriteCheck(now);
                    long deadline = ProtonEngine.nextTickDeadline(ProtonEngine.this.localIdleDeadline, ProtonEngine.this.remoteIdleDeadline);
                    if (deadline != 0L && ProtonEngine.this.connection.getState() == ConnectionState.ACTIVE && ProtonEngine.this.state() == EngineState.STARTED) {
                        long delay = (deadline - now) / 2L;
                        delay = Math.max(1000L, delay);
                        delay = Math.min(10000L, delay);
                        checkScheduled = true;
                        LOG.trace("IdleTimeoutCheck rescheduling with delay: {}", (Object)delay);
                        ProtonEngine.this.nextIdleTimeoutCheck = ProtonEngine.this.idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS);
                    }
                }
                catch (Throwable t) {
                    LOG.trace("Auto Idle Timeout Check encountered error during check: ", (Object)t);
                }
            }
            if (!checkScheduled) {
                ProtonEngine.this.nextIdleTimeoutCheck = null;
                LOG.trace("Auto Idle Timeout Check task exiting and will not be rescheduled");
            }
        }
    }
}

