/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupProxyHandler {
    private final ProxyService service;
    private final ProxyConnection proxyConnection;
    private final boolean connectWithTLS;
    private SocketAddress clientAddress;
    private String brokerServiceURL;
    private static final Counter lookupRequests = (Counter)Counter.build((String)"pulsar_proxy_lookup_requests", (String)"Counter of topic lookup requests").create().register();
    private static final Counter partitionsMetadataRequests = (Counter)Counter.build((String)"pulsar_proxy_partitions_metadata_requests", (String)"Counter of partitions metadata requests").create().register();
    private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);

    public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
        this.service = proxy;
        this.proxyConnection = proxyConnection;
        this.clientAddress = proxyConnection.clientAddress();
        this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
        this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS() : proxy.getConfiguration().getBrokerServiceURL();
    }

    public void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        String serviceUrl;
        if (log.isDebugEnabled()) {
            log.debug("Received Lookup from {}", (Object)this.clientAddress);
        }
        lookupRequests.inc();
        long clientRequestId = lookup.getRequestId();
        String topic = lookup.getTopic();
        if (StringUtils.isBlank((CharSequence)this.brokerServiceURL)) {
            LoadManagerReport availableBroker = null;
            try {
                availableBroker = this.service.getDiscoveryProvider().nextBroker();
            }
            catch (Exception e) {
                log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)e.getMessage(), (long)clientRequestId));
                return;
            }
            serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
        } else {
            serviceUrl = this.connectWithTLS ? this.service.getConfiguration().getBrokerServiceURLTLS() : this.service.getConfiguration().getBrokerServiceURL();
        }
        this.performLookup(clientRequestId, topic, serviceUrl, false, 10);
    }

    private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative, int numberOfRetries) {
        URI brokerURI;
        if (numberOfRetries == 0) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)"Reached max number of redirections", (long)clientRequestId));
            return;
        }
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
            return;
        }
        InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, clientRequestId});
        }
        ((CompletableFuture)this.service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.service.newRequestId();
            ByteBuf command = this.service.getConfiguration().isAuthenticationEnabled() ? Commands.newLookup((String)topic, (boolean)authoritative, (String)this.proxyConnection.clientAuthRole, (String)this.proxyConnection.clientAuthData, (String)this.proxyConnection.clientAuthMethod, (long)requestId) : Commands.newLookup((String)topic, (boolean)authoritative, (long)requestId);
            ((CompletableFuture)clientCnx.newLookup(command, requestId).thenAccept(result -> {
                if (result.redirect) {
                    this.performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
                } else {
                    String brokerUrl = this.connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupResponse((String)brokerUrl, (String)brokerUrl, (boolean)true, (PulsarApi.CommandLookupTopicResponse.LookupType)PulsarApi.CommandLookupTopicResponse.LookupType.Connect, (long)clientRequestId, (boolean)true));
                }
            })).exceptionally(ex -> {
                log.warn("[{}] Failed to lookup topic {}: {}", new Object[]{this.clientAddress, topic, ex.getMessage()});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        })).exceptionally(ex -> {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
            return null;
        });
    }

    public void handlePartitionMetadataResponse(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        partitionsMetadataRequests.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup", (Object)this.clientAddress);
        }
        long clientRequestId = partitionMetadata.getRequestId();
        DestinationName dn = DestinationName.get((String)partitionMetadata.getTopic());
        if (StringUtils.isBlank((CharSequence)this.brokerServiceURL)) {
            ((CompletableFuture)this.service.getDiscoveryProvider().getPartitionedTopicMetadata(this.service, dn, this.proxyConnection.clientAuthRole, this.proxyConnection.authenticationData).thenAccept(metadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{this.proxyConnection.clientAuthRole, dn, metadata.partitions});
                }
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)metadata.partitions, (long)clientRequestId));
            })).exceptionally(ex -> {
                log.warn("[{}] Failed to get partitioned metadata for topic {} {}", new Object[]{this.clientAddress, dn, ex.getMessage(), ex});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        } else {
            URI brokerURI;
            try {
                brokerURI = new URI(this.brokerServiceURL);
            }
            catch (URISyntaxException e) {
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
                return;
            }
            InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
            if (log.isDebugEnabled()) {
                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, dn.getPartitionedTopicName(), clientRequestId});
            }
            ((CompletableFuture)this.service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
                long requestId = this.service.newRequestId();
                ByteBuf command = this.service.getConfiguration().isAuthenticationEnabled() ? Commands.newPartitionMetadataRequest((String)dn.toString(), (long)requestId, (String)this.proxyConnection.clientAuthRole, (String)this.proxyConnection.clientAuthData, (String)this.proxyConnection.clientAuthMethod) : Commands.newPartitionMetadataRequest((String)dn.toString(), (long)requestId);
                ((CompletableFuture)clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)lookupDataResult.partitions, (long)clientRequestId)))).exceptionally(ex -> {
                    log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{dn.toString(), ex.getCause().getMessage(), ex});
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                    return null;
                });
            })).exceptionally(ex -> {
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        }
    }
}

