/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.Events;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;

public class ProtonHandler
extends ProtonInitializable {
    private static final Logger log = Logger.getLogger(ProtonHandler.class);
    private static final byte SASL = 3;
    private static final byte BARE = 0;
    private final Transport transport = Proton.transport();
    private final Connection connection = Proton.connection();
    private final Collector collector = Proton.collector();
    private List<EventHandler> handlers = new ArrayList<EventHandler>();
    private Sasl serverSasl;
    private final ReentrantLock lock = new ReentrantLock();
    private final long creationTime;
    private Map<String, ServerSASL> saslHandlers;
    private SASLResult saslResult;
    protected volatile boolean dataReceived;
    protected boolean receivedFirstPacket = false;
    private final Executor flushExecutor;
    protected final ReadyListener readyListener;
    boolean inDispatch = false;

    public ProtonHandler(Executor flushExecutor) {
        this.flushExecutor = flushExecutor;
        this.readyListener = () -> flushExecutor.execute(() -> this.flush());
        this.creationTime = System.currentTimeMillis();
        this.transport.bind(this.connection);
        this.connection.collect(this.collector);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long tick(boolean firstTick) {
        this.lock.lock();
        try {
            if (!firstTick) {
                try {
                    if (this.connection.getLocalState() != EndpointState.CLOSED) {
                        long rescheduleAt = this.transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
                        if (this.transport.isClosed()) {
                            throw new IllegalStateException("Channel was inactive for to long");
                        }
                        long l = rescheduleAt;
                        return l;
                    }
                }
                catch (Exception e) {
                    log.warn((Object)e.getMessage(), (Throwable)e);
                    this.transport.close();
                    this.connection.setCondition(new ErrorCondition());
                }
                long l = 0L;
                return l;
            }
            long l = this.transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
            return l;
        }
        finally {
            this.lock.unlock();
            this.flushBytes();
        }
    }

    public int capacity() {
        this.lock.lock();
        try {
            int n = this.transport.capacity();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void lock() {
        this.lock.lock();
    }

    public void unlock() {
        this.lock.unlock();
    }

    public boolean tryLock(long time, TimeUnit timeUnit) {
        try {
            return this.lock.tryLock(time, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public Transport getTransport() {
        return this.transport;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public ProtonHandler addEventHandler(EventHandler handler) {
        this.handlers.add(handler);
        return this;
    }

    public void createServerSASL(ServerSASL[] handlers) {
        this.serverSasl = this.transport.sasl();
        this.saslHandlers = new HashMap<String, ServerSASL>();
        String[] names = new String[handlers.length];
        int count = 0;
        for (ServerSASL handler : handlers) {
            this.saslHandlers.put(handler.getName(), handler);
            names[count++] = handler.getName();
        }
        this.serverSasl.server();
        this.serverSasl.setMechanisms(names);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flushBytes() {
        for (EventHandler handler : this.handlers) {
            if (handler.flowControl(this.readyListener)) continue;
            return;
        }
        this.lock.lock();
        try {
            int pending;
            while ((pending = this.transport.pending()) > 0) {
                ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending);
                ByteBuffer head = this.transport.head();
                buffer.writeBytes(head);
                for (EventHandler handler : this.handlers) {
                    handler.pushBytes(buffer);
                }
                this.transport.pop(pending);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public SASLResult getSASLResult() {
        return this.saslResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void inputBuffer(ByteBuf buffer) {
        this.dataReceived = true;
        this.lock.lock();
        try {
            while (buffer.readableBytes() > 0) {
                int capacity = this.transport.capacity();
                if (!this.receivedFirstPacket) {
                    try {
                        byte auth = buffer.getByte(4);
                        if (auth == 3 || auth == 0) {
                            this.dispatchAuth(auth == 3);
                            capacity = this.transport.capacity();
                        }
                    }
                    catch (Throwable e) {
                        log.warn((Object)e.getMessage(), e);
                    }
                    this.receivedFirstPacket = true;
                }
                if (capacity > 0) {
                    ByteBuffer tail = this.transport.tail();
                    int min = Math.min(capacity, buffer.readableBytes());
                    tail.limit(min);
                    buffer.readBytes(tail);
                    this.flush();
                    continue;
                }
                if (capacity == 0) {
                    log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
                } else {
                    log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), this.transport.capacity());
                }
                break;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean checkDataReceived() {
        boolean res = this.dataReceived;
        this.dataReceived = false;
        return res;
    }

    public long getCreationTime() {
        return this.creationTime;
    }

    public void flush() {
        this.lock.lock();
        try {
            this.transport.process();
            this.checkServerSASL();
        }
        finally {
            this.lock.unlock();
        }
        this.dispatch();
    }

    public void close(ErrorCondition errorCondition) {
        this.lock.lock();
        try {
            if (errorCondition != null) {
                this.connection.setCondition(errorCondition);
            }
            this.connection.close();
        }
        finally {
            this.lock.unlock();
        }
        this.flush();
    }

    protected void checkServerSASL() {
        if (this.serverSasl != null && this.serverSasl.getRemoteMechanisms().length > 0) {
            ServerSASL mechanism = this.saslHandlers.get(this.serverSasl.getRemoteMechanisms()[0]);
            if (mechanism != null) {
                byte[] dataSASL = new byte[this.serverSasl.pending()];
                this.serverSasl.recv(dataSASL, 0, dataSASL.length);
                if (log.isTraceEnabled()) {
                    log.trace((Object)("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex((byte[])dataSASL, (int)2) : "Anonymous")));
                }
                this.saslResult = mechanism.processSASL(dataSASL);
                if (this.saslResult != null && this.saslResult.isSuccess()) {
                    this.serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                    this.serverSasl = null;
                    this.saslHandlers.clear();
                    this.saslHandlers = null;
                } else {
                    this.serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
                }
                this.serverSasl = null;
            } else {
                this.serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS);
            }
        }
    }

    private void dispatchAuth(boolean sasl) {
        for (EventHandler h : this.handlers) {
            h.onAuthInit(this, this.getConnection(), sasl);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatch() {
        this.lock.lock();
        try {
            if (this.inDispatch) {
                return;
            }
            try {
                Event ev;
                this.inDispatch = true;
                while ((ev = this.collector.peek()) != null) {
                    for (EventHandler h : this.handlers) {
                        if (log.isTraceEnabled()) {
                            log.trace((Object)("Handling " + ev + " towards " + h));
                        }
                        try {
                            Events.dispatch(ev, h);
                        }
                        catch (Exception e) {
                            log.warn((Object)e.getMessage(), (Throwable)e);
                            ErrorCondition error = new ErrorCondition();
                            error.setCondition(AmqpError.INTERNAL_ERROR);
                            error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
                            this.connection.setCondition(error);
                            this.connection.close();
                        }
                    }
                    this.collector.pop();
                }
            }
            finally {
                this.inDispatch = false;
            }
        }
        finally {
            this.lock.unlock();
        }
        this.flushBytes();
    }

    public void open(String containerId, Map<Symbol, Object> connectionProperties) {
        this.transport.open();
        this.connection.setContainer(containerId);
        this.connection.setProperties(connectionProperties);
        this.connection.open();
        this.flush();
    }
}

