/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.container.common.transport.server;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class XceiverServerGrpc
implements XceiverServerSpi {
    private static final Logger LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
    private static final String COMPONENT = "dn";
    private int port;
    private UUID id;
    private Server server;
    private final ContainerDispatcher storageContainer;
    private boolean isStarted;
    private DatanodeDetails datanodeDetails;
    private ThreadPoolExecutor readExecutors;
    private EventLoopGroup eventLoopGroup;
    private Class<? extends ServerChannel> channelType;

    public XceiverServerGrpc(DatanodeDetails datanodeDetails, ConfigurationSource conf, ContainerDispatcher dispatcher, CertificateClient caClient) {
        Preconditions.checkNotNull((Object)conf);
        this.id = datanodeDetails.getUuid();
        this.datanodeDetails = datanodeDetails;
        this.port = conf.getInt("dfs.container.ipc", 9859);
        if (conf.getBoolean("dfs.container.ipc.random.port", false)) {
            this.port = 0;
        }
        int threadCountPerDisk = ((DatanodeConfiguration)((Object)conf.getObject(DatanodeConfiguration.class))).getNumReadThreadPerVolume();
        int numberOfDisks = HddsServerUtil.getDatanodeStorageDirs((ConfigurationSource)conf).size();
        int poolSize = threadCountPerDisk * numberOfDisks;
        this.readExecutors = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(datanodeDetails.threadNamePrefix() + "ChunkReader-%d").build());
        ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(datanodeDetails.threadNamePrefix() + "ChunkReader-ELG-%d").build();
        if (Epoll.isAvailable()) {
            this.eventLoopGroup = new EpollEventLoopGroup(poolSize / 10, factory);
            this.channelType = EpollServerSocketChannel.class;
        } else {
            this.eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
            this.channelType = NioServerSocketChannel.class;
        }
        boolean zeroCopyEnabled = conf.getBoolean("ozone.ec.grpc.zerocopy.enabled", true);
        LOG.info("GrpcServer channel type {}", (Object)this.channelType.getSimpleName());
        GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher, zeroCopyEnabled);
        NettyServerBuilder nettyServerBuilder = (NettyServerBuilder)((NettyServerBuilder)NettyServerBuilder.forPort((int)this.port).maxInboundMessageSize(0x2000000).bossEventLoopGroup(this.eventLoopGroup).workerEventLoopGroup(this.eventLoopGroup).channelType(this.channelType).executor((Executor)this.readExecutors)).addService(ServerInterceptors.intercept((ServerServiceDefinition)xceiverService.bindServiceWithZeroCopy(), (ServerInterceptor[])new ServerInterceptor[]{new GrpcServerInterceptor()}));
        SecurityConfig secConf = new SecurityConfig(conf);
        if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
            try {
                SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer((KeyManager)caClient.getServerKeyStoresFactory().getKeyManagers()[0]);
                SslContextBuilder sslContextBuilder = GrpcSslContexts.configure((SslContextBuilder)sslClientContextBuilder, (SslProvider)secConf.getGrpcSslProvider());
                nettyServerBuilder.sslContext(sslContextBuilder.build());
            }
            catch (Exception ex) {
                LOG.error("Unable to setup TLS for secure datanode GRPC endpoint.", (Throwable)ex);
            }
        }
        this.server = nettyServerBuilder.build();
        this.storageContainer = dispatcher;
    }

    @Override
    public int getIPCPort() {
        return this.port;
    }

    @Override
    public HddsProtos.ReplicationType getServerType() {
        return HddsProtos.ReplicationType.STAND_ALONE;
    }

    @Override
    public void start() throws IOException {
        if (!this.isStarted) {
            this.server.start();
            int realPort = this.server.getPort();
            if (this.port == 0) {
                LOG.info("{} {} is started using port {}", new Object[]{this.getClass().getSimpleName(), this.id, realPort});
                this.port = realPort;
            }
            this.datanodeDetails.setPort(DatanodeDetails.newPort((DatanodeDetails.Port.Name)DatanodeDetails.Port.Name.STANDALONE, (Integer)realPort));
            this.isStarted = true;
        }
    }

    @Override
    public void stop() {
        if (this.isStarted) {
            try {
                this.readExecutors.shutdown();
                this.readExecutors.awaitTermination(5L, TimeUnit.SECONDS);
                this.server.shutdown();
                this.server.awaitTermination(5L, TimeUnit.SECONDS);
                this.eventLoopGroup.shutdownGracefully().sync();
            }
            catch (InterruptedException e) {
                LOG.error("failed to shutdown XceiverServerGrpc", (Throwable)e);
                Thread.currentThread().interrupt();
            }
            this.isStarted = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitRequest(ContainerProtos.ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException {
        Span span = TracingUtil.importAndCreateSpan((String)("XceiverServerGrpc." + request.getCmdType().name()), (String)request.getTraceID());
        try (Scope scope = GlobalTracer.get().activateSpan(span);){
            ContainerProtos.ContainerCommandResponseProto response = this.storageContainer.dispatch(request, null);
            if (response.getResult() != ContainerProtos.Result.SUCCESS) {
                throw new StorageContainerException(response.getMessage(), response.getResult());
            }
        }
        finally {
            span.finish();
        }
    }

    @Override
    public boolean isExist(HddsProtos.PipelineID pipelineId) {
        return PipelineID.valueOf((UUID)this.id).getProtobuf().equals((Object)pipelineId);
    }

    @Override
    public List<StorageContainerDatanodeProtocolProtos.PipelineReport> getPipelineReport() {
        return Collections.singletonList(StorageContainerDatanodeProtocolProtos.PipelineReport.newBuilder().setPipelineID(PipelineID.valueOf((UUID)this.id).getProtobuf()).build());
    }
}

