/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.extract.base;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.NettyClientHandler;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.extract.base.future.InvokeCallback;
import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyRemotingClient
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NettyRemotingClient.class);
    private final Bootstrap bootstrap = new Bootstrap();
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final Semaphore asyncSemaphore = new Semaphore(1024, true);
    private final ExecutorService callbackExecutor;
    private final NettyClientHandler clientHandler;
    private final ScheduledExecutorService responseFutureExecutor;

    public NettyRemotingClient(NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory((String)"NettyClientThread-");
        this.workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory) : new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
        this.callbackExecutor = new ThreadPoolExecutor(Constants.CPUS, Constants.CPUS, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(1000), ThreadUtils.newDaemonThreadFactory((String)"NettyClientCallbackThread-"), new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, this.callbackExecutor);
        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory((String)"NettyClientResponseFutureThread-"));
        this.start();
    }

    private void start() {
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.workerGroup)).channel(NettyUtils.getSocketChannelClass())).option(ChannelOption.SO_KEEPALIVE, (Object)this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, (Object)this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, (Object)this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, (Object)this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.clientConfig.getConnectTimeoutMillis())).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast("client-idle-handler", (ChannelHandler)new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new TransporterDecoder(), NettyRemotingClient.this.clientHandler, new TransporterEncoder()});
            }
        });
        this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0L, 1L, TimeUnit.SECONDS);
        this.isStarted.compareAndSet(false, true);
    }

    public void sendAsync(Host host, Transporter transporter, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
        Channel channel = this.getChannel(host);
        if (channel == null) {
            throw new RemotingException("network error");
        }
        long opaque = transporter.getHeader().getOpaque();
        boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
            ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
            try {
                channel.writeAndFlush((Object)transporter).addListener(future -> {
                    if (future.isSuccess()) {
                        responseFuture.setSendOk(true);
                        return;
                    }
                    responseFuture.setSendOk(false);
                    responseFuture.setCause(future.cause());
                    responseFuture.putResponse(null);
                    try {
                        responseFuture.executeInvokeCallback();
                    }
                    catch (Exception ex) {
                        log.error("execute callback error", (Throwable)ex);
                    }
                    finally {
                        responseFuture.release();
                    }
                });
            }
            catch (Exception ex) {
                responseFuture.release();
                throw new RemotingException(String.format("Send transporter to host: %s failed", host), ex);
            }
        } else {
            String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
            throw new RemotingTooMuchRequestException(message);
        }
    }

    public IRpcResponse sendSync(Host host, Transporter transporter, long timeoutMillis) throws InterruptedException, RemotingException {
        Channel channel = this.getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        long opaque = transporter.getHeader().getOpaque();
        ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush((Object)transporter).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            }
            responseFuture.setSendOk(false);
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            log.error("Send Sync request {} to host {} failed", new Object[]{transporter, host, responseFuture.getCause()});
        });
        IRpcResponse iRpcResponse = responseFuture.waitResponse();
        if (iRpcResponse == null) {
            if (responseFuture.isSendOK()) {
                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
            }
            throw new RemotingException(host.toString(), responseFuture.getCause());
        }
        return iRpcResponse;
    }

    public Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        if (channel != null && channel.isActive()) {
            return channel;
        }
        return this.createChannel(host, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Channel createChannel(Host host, boolean isSync) {
        try {
            ChannelFuture future;
            Bootstrap bootstrap = this.bootstrap;
            synchronized (bootstrap) {
                future = this.bootstrap.connect((SocketAddress)new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (isSync) {
                future.sync();
            }
            if (future.isSuccess()) {
                Channel channel = future.channel();
                this.channels.put(host, channel);
                return channel;
            }
        }
        catch (Exception ex) {
            log.warn(String.format("connect to %s error", host), (Throwable)ex);
        }
        return null;
    }

    @Override
    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                this.closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
                if (this.callbackExecutor != null) {
                    this.callbackExecutor.shutdownNow();
                }
                if (this.responseFutureExecutor != null) {
                    this.responseFutureExecutor.shutdownNow();
                }
                log.info("netty client closed");
            }
            catch (Exception ex) {
                log.error("netty client close exception", (Throwable)ex);
            }
        }
    }

    private void closeChannels() {
        for (Channel channel : this.channels.values()) {
            channel.close();
        }
        this.channels.clear();
    }

    public void closeChannel(Host host) {
        Channel channel = this.channels.remove(host);
        if (channel != null) {
            channel.close();
        }
    }
}

