/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelinePlacementPolicy;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RatisPipelineProvider
implements PipelineProvider {
    private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineProvider.class);
    private final NodeManager nodeManager;
    private final PipelineStateManager stateManager;
    private final Configuration conf;
    private final EventPublisher eventPublisher;
    private final PipelinePlacementPolicy placementPolicy;
    private int pipelineNumberLimit;
    private int maxPipelinePerDatanode;
    private final int parallelismForPool = 3;
    private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
        ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex());
        return worker;
    };
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(3, this.factory, null, false);

    RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf, EventPublisher eventPublisher) {
        this.nodeManager = nodeManager;
        this.stateManager = stateManager;
        this.conf = conf;
        this.eventPublisher = eventPublisher;
        this.placementPolicy = new PipelinePlacementPolicy(nodeManager, stateManager, conf);
        this.pipelineNumberLimit = conf.getInt("ozone.scm.ratis.pipeline.limit", 0);
        this.maxPipelinePerDatanode = conf.getInt("ozone.datanode.pipeline.limit", 2);
    }

    private List<DatanodeDetails> pickNodesNeverUsed(HddsProtos.ReplicationFactor factor) throws SCMException {
        HashSet dnsUsed = new HashSet();
        this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, factor).stream().filter(p -> p.getPipelineState().equals((Object)Pipeline.PipelineState.OPEN) || p.getPipelineState().equals((Object)Pipeline.PipelineState.DORMANT) || p.getPipelineState().equals((Object)Pipeline.PipelineState.ALLOCATED)).forEach(p -> dnsUsed.addAll(p.getNodes()));
        List<DatanodeDetails> dns = this.nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).parallelStream().filter(dn -> !dnsUsed.contains(dn)).limit(factor.getNumber()).collect(Collectors.toList());
        if (dns.size() < factor.getNumber()) {
            String e = String.format("Cannot create pipeline of factor %d using %d nodes. Used %d nodes. Healthy nodes %d", factor.getNumber(), dns.size(), dnsUsed.size(), this.nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
            throw new SCMException(e, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
        }
        return dns;
    }

    private boolean exceedPipelineNumberLimit(HddsProtos.ReplicationFactor factor) {
        if (factor != HddsProtos.ReplicationFactor.THREE) {
            return false;
        }
        if (this.maxPipelinePerDatanode > 0) {
            return this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, factor).size() - this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, factor, Pipeline.PipelineState.CLOSED).size() > this.maxPipelinePerDatanode * this.nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) / factor.getNumber();
        }
        if (this.pipelineNumberLimit > 0) {
            return this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE).size() - this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.CLOSED).size() > this.pipelineNumberLimit - this.stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE).size();
        }
        return false;
    }

    @Override
    public Pipeline create(HddsProtos.ReplicationFactor factor) throws IOException {
        List<DatanodeDetails> dns;
        if (this.exceedPipelineNumberLimit(factor)) {
            throw new SCMException("Ratis pipeline number meets the limit: " + this.pipelineNumberLimit + " factor : " + factor.getNumber(), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
        }
        switch (factor) {
            case ONE: {
                dns = this.pickNodesNeverUsed(HddsProtos.ReplicationFactor.ONE);
                break;
            }
            case THREE: {
                dns = this.placementPolicy.chooseDatanodes(null, null, factor.getNumber(), 0L);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown factor: " + factor.name());
            }
        }
        Pipeline pipeline = Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.ALLOCATED).setType(HddsProtos.ReplicationType.RATIS).setFactor(factor).setNodes(dns).build();
        CreatePipelineCommand createCommand = new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns);
        dns.forEach(node -> {
            LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}", (Object)pipeline.getId(), (Object)node.getUuidString());
            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(node.getUuid(), (SCMCommand)createCommand));
        });
        return pipeline;
    }

    @Override
    public Pipeline create(HddsProtos.ReplicationFactor factor, List<DatanodeDetails> nodes) {
        return Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.ALLOCATED).setType(HddsProtos.ReplicationType.RATIS).setFactor(factor).setNodes(nodes).build();
    }

    @Override
    public void shutdown() {
        this.forkJoinPool.shutdownNow();
        try {
            this.forkJoinPool.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Unexpected exception occurred during shutdown of RatisPipelineProvider", (Throwable)e);
        }
    }

    @Override
    public void close(Pipeline pipeline) {
        ClosePipelineCommand closeCommand = new ClosePipelineCommand(pipeline.getId());
        pipeline.getNodes().stream().forEach(node -> {
            CommandForDatanode datanodeCommand = new CommandForDatanode(node.getUuid(), (SCMCommand)closeCommand);
            LOG.info("Send pipeline:{} close command to datanode {}", (Object)pipeline.getId(), (Object)datanodeCommand.getDatanodeId());
            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)datanodeCommand);
        });
    }
}

