/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.streamnative.pulsar.handlers.kop.proxy.InflightRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionToBroker
extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(ConnectionToBroker.class);
    private final Map<Integer, InflightRequest> pendingRequests = new ConcurrentHashMap<Integer, InflightRequest>();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final InetSocketAddress address;
    private volatile ChannelHandlerContext ctx = null;
    private volatile ChannelHandlerContext clientChannel = null;
    private volatile Runnable disconnectCallback = null;

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        this.ctx = ctx;
        log.info("[{}] Connection to broker is registered", (Object)ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[{}] Unexpected error in ConnectionToBroker", (Object)ctx, (Object)cause);
        this.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        try {
            int readerIndex = buf.readerIndex();
            int correlationId = buf.getInt(0);
            InflightRequest inflightRequest = this.pendingRequests.remove(correlationId);
            if (inflightRequest == null) {
                log.warn("[{}] Correlation id {} is not pending", (Object)ctx, (Object)correlationId);
                this.close();
                return;
            }
            buf.readerIndex(readerIndex);
            if (inflightRequest.isSkipParsingResponse()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Received response of {} from broker", (Object)ctx, (Object)inflightRequest.getHeader());
                }
                inflightRequest.complete(buf.retain());
                return;
            }
            ByteBuffer buffer = buf.nioBuffer();
            AbstractResponse response = AbstractResponse.parseResponse((ByteBuffer)buffer, (RequestHeader)inflightRequest.getHeader());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received response {} from broker", (Object)ctx, (Object)response);
            }
            inflightRequest.complete(response);
        }
        catch (Throwable throwable) {
            log.error("[{}] Unexpected error when handling responses from broker", (Object)ctx, (Object)throwable);
            this.close();
        }
        finally {
            ReferenceCountUtil.safeRelease((Object)buf);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("[{}] Connection to broker is inactive", (Object)ctx);
        this.close();
        this.ctx = null;
    }

    void disconnectBroker() {
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    private void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.clientChannel != null) {
                log.info("[{}] Close connection to client {}", (Object)this.ctx, (Object)this.clientChannel);
                this.clientChannel.close();
            }
            this.disconnectBroker();
            this.pendingRequests.values().forEach(inflightRequest -> inflightRequest.fail(new ConnectError("Connection is closed")));
            this.pendingRequests.clear();
            if (this.disconnectCallback != null) {
                this.disconnectCallback.run();
            }
        }
    }

    void forwardRequest(InflightRequest inflightRequest) {
        this.forwardRequest(inflightRequest, true);
    }

    void forwardRequest(InflightRequest inflightRequest, boolean cache) {
        ChannelHandlerContext ctx = this.ctx;
        if (ctx == null) {
            log.error("Channel is inactive when forwarding request {}", (Object)inflightRequest.getHeader());
            inflightRequest.fail(new ConnectError("Channel is not registered"));
            return;
        }
        if (this.closed.get()) {
            inflightRequest.fail(new ConnectError("Connection is closed"));
            return;
        }
        int correlationId = inflightRequest.getHeader().correlationId();
        if (cache && this.pendingRequests.putIfAbsent(correlationId, inflightRequest) != null) {
            log.error("[{}] Received request with the same correlation id {}", (Object)ctx, (Object)correlationId);
            inflightRequest.fail(new ConnectError("Duplicated correlation id " + correlationId));
            return;
        }
        inflightRequest.sendToChannel(ctx.channel());
    }

    ConnectionToBroker withDisconnectCallback(Runnable callback) {
        this.disconnectCallback = callback;
        return this;
    }

    ConnectionToBroker withClientChannel(ChannelHandlerContext clientChannel) {
        this.clientChannel = clientChannel;
        return this;
    }

    ConnectionToBroker forwardRequestsAndWait(List<InflightRequest> requests) throws IOException {
        requests.forEach(this::forwardRequest);
        for (InflightRequest request : requests) {
            request.waitForResponse();
        }
        return this;
    }

    public ConnectionToBroker(InetSocketAddress address) {
        this.address = address;
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    static class ConnectError
    extends RuntimeException {
        public ConnectError(String msg) {
            super(msg);
        }
    }
}

