/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.discovery.service;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandler;
import io.netty.handler.ssl.SslHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.discovery.service.DiscoveryService;
import org.apache.pulsar.discovery.service.PulsarServerException;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerConnection
extends PulsarHandler {
    private DiscoveryService service;
    private String authRole = null;
    private State state;
    public static final String TLS_HANDLER = "tls";
    private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);

    public ServerConnection(DiscoveryService discoveryService) {
        super(0, TimeUnit.SECONDS);
        this.service = discoveryService;
        this.state = State.Start;
    }

    protected void handleConnect(PulsarApi.CommandConnect connect) {
        Preconditions.checkArgument((this.state == State.Start ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {}", (Object)this.remoteAddress);
        }
        if (this.service.getConfiguration().isAuthenticationEnabled()) {
            try {
                String authMethod = "none";
                if (connect.hasAuthMethodName()) {
                    authMethod = connect.getAuthMethodName();
                } else if (connect.hasAuthMethod()) {
                    authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
                }
                String authData = connect.getAuthData().toStringUtf8();
                ChannelHandler sslHandler = this.ctx.channel().pipeline().get(TLS_HANDLER);
                SSLSession sslSession = null;
                if (sslHandler != null) {
                    sslSession = ((SslHandler)sslHandler).engine().getSession();
                }
                this.authRole = this.service.getAuthenticationService().authenticate((AuthenticationDataSource)new AuthenticationDataCommand(authData, this.remoteAddress, sslSession), authMethod);
                LOG.info("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, authMethod, this.authRole});
            }
            catch (AuthenticationException e) {
                String msg = "Unable to authenticate";
                LOG.warn("[{}] {}: {}", new Object[]{this.remoteAddress, msg, e.getMessage()});
                this.ctx.writeAndFlush((Object)Commands.newError((long)-1L, (PulsarApi.ServerError)PulsarApi.ServerError.AuthenticationError, (String)msg));
                this.close();
                return;
            }
        }
        this.ctx.writeAndFlush((Object)Commands.newConnected((int)connect.getProtocolVersion()));
        this.state = State.Connected;
        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received PartitionMetadataLookup from {}", (Object)this.remoteAddress);
        }
        this.sendPartitionMetadataResponse(partitionMetadata);
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic lookup) {
        Preconditions.checkArgument((this.state == State.Connected ? 1 : 0) != 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received Lookup from {}", (Object)this.remoteAddress);
        }
        this.sendLookupResponse(lookup.getRequestId());
    }

    private void close() {
        this.ctx.close();
    }

    private void sendLookupResponse(long requestId) {
        try {
            LoadManagerReport availableBroker = this.service.getDiscoveryProvider().nextBroker();
            this.ctx.writeAndFlush((Object)Commands.newLookupResponse((String)availableBroker.getPulsarServiceUrl(), (String)availableBroker.getPulsarServiceUrlTls(), (boolean)false, (PulsarApi.CommandLookupTopicResponse.LookupType)PulsarApi.CommandLookupTopicResponse.LookupType.Redirect, (long)requestId, (boolean)false));
        }
        catch (PulsarServerException e) {
            LOG.warn("[{}] Failed to get next active broker {}", new Object[]{this.remoteAddress, e.getMessage(), e});
            this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)e.getMessage(), (long)requestId));
        }
    }

    private void sendPartitionMetadataResponse(PulsarApi.CommandPartitionedTopicMetadata partitionMetadata) {
        long requestId = partitionMetadata.getRequestId();
        DestinationName dn = DestinationName.get((String)partitionMetadata.getTopic());
        ((CompletableFuture)this.service.getDiscoveryProvider().getPartitionedTopicMetadata(this.service, dn, this.authRole).thenAccept(metadata -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{this.authRole, dn, metadata.partitions});
            }
            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)metadata.partitions, (long)requestId));
        })).exceptionally(ex -> {
            LOG.warn("[{}] Failed to get partitioned metadata for topic {} {}", new Object[]{this.remoteAddress, dn, ex.getMessage(), ex});
            this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((PulsarApi.ServerError)PulsarApi.ServerError.ServiceNotReady, (String)ex.getMessage(), (long)requestId));
            return null;
        });
    }

    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    static enum State {
        Start,
        Connected;

    }
}

