/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RatisPipelineProvider
implements PipelineProvider {
    private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineProvider.class);
    private final NodeManager nodeManager;
    private final PipelineStateManager stateManager;
    private final Configuration conf;
    private final int parallelismForPool = 3;
    private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
        ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex());
        return worker;
    };
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(3, this.factory, null, false);
    private final GrpcTlsConfig tlsConfig;

    RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf, GrpcTlsConfig tlsConfig) {
        this.nodeManager = nodeManager;
        this.stateManager = stateManager;
        this.conf = conf;
        this.tlsConfig = tlsConfig;
    }

    private static ContainerPlacementPolicy createContainerPlacementPolicy(NodeManager nodeManager, Configuration conf) {
        Class implClass = conf.getClass("ozone.scm.container.placement.impl", SCMContainerPlacementRandom.class);
        try {
            Constructor ctor = implClass.getDeclaredConstructor(NodeManager.class, Configuration.class);
            return (ContainerPlacementPolicy)ctor.newInstance(nodeManager, conf);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException(implClass.getName() + " could not be constructed.", e.getCause());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Unable to load ContainerPlacementPolicy", e);
        }
    }

    @Override
    public Pipeline create(HddsProtos.ReplicationFactor factor) throws IOException {
        HashSet dnsUsed = new HashSet();
        this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, factor).stream().filter(p -> p.getPipelineState().equals((Object)Pipeline.PipelineState.OPEN) || p.getPipelineState().equals((Object)Pipeline.PipelineState.ALLOCATED)).forEach(p -> dnsUsed.addAll(p.getNodes()));
        List dns = this.nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).parallelStream().filter(dn -> !dnsUsed.contains(dn)).limit(factor.getNumber()).collect(Collectors.toList());
        if (dns.size() < factor.getNumber()) {
            String e = String.format("Cannot create pipeline of factor %d using %d nodes.", factor.getNumber(), dns.size());
            throw new InsufficientDatanodesException(e);
        }
        Pipeline pipeline = Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.OPEN).setType(HddsProtos.ReplicationType.RATIS).setFactor(factor).setNodes(dns).build();
        this.initializePipeline(pipeline);
        return pipeline;
    }

    @Override
    public Pipeline create(HddsProtos.ReplicationFactor factor, List<DatanodeDetails> nodes) {
        return Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.OPEN).setType(HddsProtos.ReplicationType.RATIS).setFactor(factor).setNodes(nodes).build();
    }

    @Override
    public void shutdown() {
        this.forkJoinPool.shutdownNow();
        try {
            this.forkJoinPool.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Unexpected exception occurred during shutdown of RatisPipelineProvider", (Throwable)e);
        }
    }

    protected void initializePipeline(Pipeline pipeline) throws IOException {
        RaftGroup group = RatisHelper.newRaftGroup((Pipeline)pipeline);
        LOG.debug("creating pipeline:{} with {}", (Object)pipeline.getId(), (Object)group);
        this.callRatisRpc(pipeline.getNodes(), (CheckedBiConsumer<RaftClient, RaftPeer, IOException>)((CheckedBiConsumer)(raftClient, peer) -> {
            RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
            if (reply == null || !reply.isSuccess()) {
                String msg = "Pipeline initialization failed for pipeline:" + pipeline.getId() + " node:" + peer.getId();
                LOG.error(msg);
                throw new IOException(msg);
            }
        }));
    }

    private void callRatisRpc(List<DatanodeDetails> datanodes, CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc) throws IOException {
        if (datanodes.isEmpty()) {
            return;
        }
        String rpcType = this.conf.get("dfs.container.ratis.rpc.type", "GRPC");
        RetryPolicy retryPolicy = RatisHelper.createRetryPolicy((Configuration)this.conf);
        List exceptions = Collections.synchronizedList(new ArrayList());
        int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests((Configuration)this.conf);
        TimeDuration requestTimeout = RatisHelper.getClientRequestTimeout((Configuration)this.conf);
        try {
            ((ForkJoinTask)this.forkJoinPool.submit(() -> datanodes.parallelStream().forEach(d -> {
                RaftPeer p = RatisHelper.toRaftPeer((DatanodeDetails)d);
                try (RaftClient client = RatisHelper.newRaftClient((RpcType)SupportedRpcType.valueOfIgnoreCase((String)rpcType), (RaftPeer)p, (RetryPolicy)retryPolicy, (int)maxOutstandingRequests, (GrpcTlsConfig)this.tlsConfig, (TimeDuration)requestTimeout);){
                    rpc.accept((Object)client, (Object)p);
                }
                catch (IOException ioe) {
                    String errMsg = "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
                    LOG.error(errMsg, (Throwable)ioe);
                    exceptions.add(new IOException(errMsg, ioe));
                }
            }))).get();
        }
        catch (ExecutionException | RejectedExecutionException ex) {
            LOG.error(ex.getClass().getName() + " exception occurred during createPipeline", (Throwable)ex);
            throw new IOException(ex.getClass().getName() + " exception occurred during createPipeline", ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupt exception occurred during createPipeline", ex);
        }
        if (!exceptions.isEmpty()) {
            throw MultipleIOException.createIOException(exceptions);
        }
    }
}

