/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.proxy;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.streamnative.pulsar.handlers.kop.proxy.ConnectionToBroker;
import io.streamnative.pulsar.handlers.kop.proxy.KafkaProxyConfiguration;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionFactory {
    private static final Logger log = LoggerFactory.getLogger(ConnectionFactory.class);
    private final AtomicInteger id = new AtomicInteger(0);
    private final List<InetSocketAddress> addresses;
    private final int connectTimeoutMs;
    private final EventLoopGroup eventLoopGroup;
    private final AddressResolver<InetSocketAddress> addressResolver;

    ConnectionFactory(KafkaProxyConfiguration config, EventLoopGroup eventLoopGroup, DnsAddressResolverGroup dnsAddressResolverGroup) {
        this.addresses = config.getKafkaBootstrapServers();
        this.connectTimeoutMs = config.getBrokerProxyConnectTimeoutMs();
        this.eventLoopGroup = eventLoopGroup;
        this.addressResolver = dnsAddressResolverGroup.getResolver((EventExecutor)eventLoopGroup.next());
    }

    ConnectionToBroker getAnyConnection() throws IOException {
        for (int i = 0; i < this.addresses.size(); ++i) {
            InetSocketAddress address = this.addresses.get(this.id.getAndIncrement() % this.addresses.size());
            try {
                return this.getConnection(address);
            }
            catch (IOException e) {
                if (i != this.addresses.size() - 1) continue;
                throw e;
            }
        }
        throw new IOException("Unknown error");
    }

    ConnectionToBroker getConnection(final InetSocketAddress unresolvedAddress) throws IOException {
        List addresses = (List)ConnectionFactory.waitFuture(this.addressResolver.resolveAll((SocketAddress)unresolvedAddress), "Resolve " + unresolvedAddress);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.eventLoopGroup);
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass((EventLoopGroup)this.eventLoopGroup));
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.connectTimeoutMs);
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.option(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new LengthFieldPrepender(4)});
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x6400000, 0, 4, 0, 4));
                ch.pipeline().addLast("handler", (ChannelHandler)new ConnectionToBroker(unresolvedAddress));
            }
        });
        Channel registeredChannel = ConnectionFactory.waitChannelFuture(bootstrap.register(), "Register channel");
        for (int i = 0; i < addresses.size(); ++i) {
            InetSocketAddress address = (InetSocketAddress)addresses.get(i);
            try {
                Channel channel = ConnectionFactory.waitChannelFuture(registeredChannel.connect((SocketAddress)address), "Connect " + address);
                ConnectionToBroker cnx = (ConnectionToBroker)channel.pipeline().get("handler");
                if (cnx == null) {
                    throw new IOException("null handler in the pipeline of " + channel);
                }
                return cnx;
            }
            catch (IOException e) {
                if (i != addresses.size() - 1) continue;
                throw e;
            }
        }
        throw new IOException("Unknown error");
    }

    private static <T> T waitFuture(Future<T> future, String msg) throws IOException {
        try {
            if (log.isDebugEnabled()) {
                future.addListener(innerFuture -> {
                    if (innerFuture.isSuccess()) {
                        log.debug("{} succeeded", (Object)msg);
                    } else {
                        log.debug("{} failed: {}", (Object)msg, (Object)innerFuture.cause());
                    }
                });
            }
            return (T)future.get();
        }
        catch (ExecutionException e) {
            throw new IOException(e.getCause());
        }
        catch (InterruptedException e) {
            throw new IOException(Thread.currentThread().getName() + " is interrupted");
        }
    }

    private static Channel waitChannelFuture(ChannelFuture channelFuture, String msg) throws IOException {
        ConnectionFactory.waitFuture(channelFuture, msg);
        return channelFuture.channel();
    }
}

