/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCounted;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionFactory;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionToBroker;
import io.streamnative.pulsar.handlers.kop.proxy.InflightRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.KopResponseUtils;
import org.apache.kafka.common.requests.RequestHeader;

public class BrokerConnectionGroup {
    private static final InetSocketAddress DUMMY_ADDRESS = InetSocketAddress.createUnresolved("localhost", 19092);
    private final ConnectionFactory connectionFactory;
    @VisibleForTesting
    final Map<InetSocketAddress, ConnectionToBroker> leaderBrokers = new ConcurrentHashMap<InetSocketAddress, ConnectionToBroker>();
    private final List<ByteBuf> saslRequestBuffers = new CopyOnWriteArrayList<ByteBuf>();
    @VisibleForTesting
    volatile ConnectionToBroker metadataBroker;
    volatile ConnectionToBroker groupCoordinator;
    volatile ConnectionToBroker transactionCoordinator;
    @VisibleForTesting
    volatile ConnectionToBroker authenticateBroker;
    private ChannelHandlerContext clientChannel = null;

    public ConnectionToBroker getLeader(InetSocketAddress leader) throws IOException {
        ConnectionToBroker connectionToBroker = this.leaderBrokers.get(leader);
        if (connectionToBroker == null) {
            connectionToBroker = this.connectionFactory.getConnection(leader).withClientChannel(this.clientChannel).withDisconnectCallback(() -> this.leaderBrokers.remove(leader)).forwardRequestsAndWait(this.createSaslRequests());
            this.leaderBrokers.put(leader, connectionToBroker);
        }
        return connectionToBroker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConnectionToBroker getMetadataBroker() throws IOException {
        if (this.metadataBroker != null) {
            return this.metadataBroker;
        }
        BrokerConnectionGroup brokerConnectionGroup = this;
        synchronized (brokerConnectionGroup) {
            if (this.metadataBroker == null) {
                this.metadataBroker = this.connectionFactory.getAnyConnection().withClientChannel(this.clientChannel).withDisconnectCallback(() -> {
                    this.metadataBroker = null;
                }).forwardRequestsAndWait(this.createSaslRequests());
            }
        }
        return this.metadataBroker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConnectionToBroker getGroupCoordinator(String groupId) throws IOException {
        if (this.groupCoordinator != null) {
            return this.groupCoordinator;
        }
        BrokerConnectionGroup brokerConnectionGroup = this;
        synchronized (brokerConnectionGroup) {
            if (this.groupCoordinator == null) {
                InetSocketAddress address = this.findCoordinator(groupId, FindCoordinatorRequest.CoordinatorType.GROUP);
                this.groupCoordinator = this.connectionFactory.getConnection(address).withClientChannel(this.clientChannel).withDisconnectCallback(() -> {
                    this.groupCoordinator = null;
                }).forwardRequestsAndWait(this.createSaslRequests());
            }
        }
        return this.groupCoordinator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConnectionToBroker getTransactionCoordinator(String txnId) throws IOException {
        if (txnId == null) {
            return this.getMetadataBroker();
        }
        if (this.transactionCoordinator != null) {
            return this.transactionCoordinator;
        }
        BrokerConnectionGroup brokerConnectionGroup = this;
        synchronized (brokerConnectionGroup) {
            if (this.transactionCoordinator == null) {
                InetSocketAddress address = this.findCoordinator(txnId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
                this.transactionCoordinator = this.connectionFactory.getConnection(address).withClientChannel(this.clientChannel).withDisconnectCallback(() -> {
                    this.transactionCoordinator = null;
                }).forwardRequestsAndWait(this.createSaslRequests());
            }
        }
        return this.transactionCoordinator;
    }

    private InetSocketAddress findCoordinator(String key, FindCoordinatorRequest.CoordinatorType keyType) throws IOException {
        FindCoordinatorRequest request = new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKey(key).setKeyType(keyType.id())).build((short)3);
        ByteBuf buf = KopResponseUtils.serializeRequestToPooledBuffer((RequestHeader)new RequestHeader(ApiKeys.FIND_COORDINATOR, 3, "kop-proxy", 0), (AbstractRequest)request);
        InflightRequest inflightRequest = new InflightRequest(buf, DUMMY_ADDRESS);
        this.getMetadataBroker().forwardRequest(inflightRequest);
        Node node = ((FindCoordinatorResponse)inflightRequest.waitForResponse()).node();
        return InetSocketAddress.createUnresolved(node.host(), node.port());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    ConnectionToBroker getAuthenticateBroker() throws IOException {
        if (this.authenticateBroker != null) {
            return this.authenticateBroker;
        }
        BrokerConnectionGroup brokerConnectionGroup = this;
        synchronized (brokerConnectionGroup) {
            if (this.authenticateBroker == null) {
                this.authenticateBroker = this.connectionFactory.getAnyConnection().withDisconnectCallback(() -> {
                    this.authenticateBroker = null;
                });
            }
        }
        return this.authenticateBroker;
    }

    public void authenticate(InflightRequest inflightRequest) throws IOException {
        this.getAuthenticateBroker().forwardRequest(inflightRequest);
        this.saslRequestBuffers.add(inflightRequest.getRetainedBuffer());
    }

    public void close() {
        this.saslRequestBuffers.forEach(ReferenceCounted::release);
        this.saslRequestBuffers.clear();
        this.clientChannel = null;
        if (this.metadataBroker != null) {
            this.metadataBroker.disconnectBroker();
            this.metadataBroker = null;
        }
        if (this.groupCoordinator != null) {
            this.groupCoordinator.disconnectBroker();
            this.groupCoordinator = null;
        }
        if (this.authenticateBroker != null) {
            this.authenticateBroker.disconnectBroker();
            this.authenticateBroker = null;
        }
        this.leaderBrokers.values().forEach(ConnectionToBroker::disconnectBroker);
        this.leaderBrokers.clear();
    }

    private List<InflightRequest> createSaslRequests() {
        SocketAddress address = this.clientChannel != null ? this.clientChannel.channel().remoteAddress() : InetSocketAddress.createUnresolved("localhost", 65535);
        return this.saslRequestBuffers.stream().map(buffer -> new InflightRequest((ByteBuf)buffer, address)).toList();
    }

    public BrokerConnectionGroup(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setClientChannel(ChannelHandlerContext clientChannel) {
        this.clientChannel = clientChannel;
    }
}

