/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelinePlacementPolicy;
import org.apache.hadoop.hdds.scm.pipeline.PipelinePlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy;
import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RatisPipelineProvider
extends PipelineProvider<RatisReplicationConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineProvider.class);
    private final ConfigurationSource conf;
    private final EventPublisher eventPublisher;
    private final PlacementPolicy placementPolicy;
    private int pipelineNumberLimit;
    private int maxPipelinePerDatanode;
    private final LeaderChoosePolicy leaderChoosePolicy;
    private final SCMContext scmContext;
    private final long containerSizeBytes;
    private final long minRatisVolumeSizeBytes;

    @VisibleForTesting
    public RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, ConfigurationSource conf, EventPublisher eventPublisher, SCMContext scmContext) {
        super(nodeManager, stateManager);
        this.conf = conf;
        this.eventPublisher = eventPublisher;
        this.scmContext = scmContext;
        this.placementPolicy = PipelinePlacementPolicyFactory.getPolicy(nodeManager, stateManager, conf);
        this.pipelineNumberLimit = conf.getInt("ozone.scm.ratis.pipeline.limit", 0);
        String dnLimit = conf.get("ozone.scm.datanode.pipeline.limit");
        this.maxPipelinePerDatanode = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
        this.containerSizeBytes = (long)this.conf.getStorageSize("ozone.scm.container.size", "5GB", StorageUnit.BYTES);
        this.minRatisVolumeSizeBytes = (long)this.conf.getStorageSize("ozone.scm.datanode.ratis.volume.free-space.min", "1GB", StorageUnit.BYTES);
        try {
            this.leaderChoosePolicy = LeaderChoosePolicyFactory.getPolicy(conf, nodeManager, stateManager);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConfig) {
        if (replicationConfig.getReplicationFactor() != HddsProtos.ReplicationFactor.THREE) {
            return false;
        }
        if (this.maxPipelinePerDatanode > 0) {
            return this.getPipelineStateManager().getPipelines((ReplicationConfig)replicationConfig).size() - this.getPipelineStateManager().getPipelines((ReplicationConfig)replicationConfig, Pipeline.PipelineState.CLOSED).size() > this.maxPipelinePerDatanode * this.getNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) / replicationConfig.getRequiredNodes();
        }
        if (this.pipelineNumberLimit > 0) {
            return this.getPipelineStateManager().getPipelines((ReplicationConfig)replicationConfig).size() - this.getPipelineStateManager().getPipelines((ReplicationConfig)replicationConfig, Pipeline.PipelineState.CLOSED).size() > this.pipelineNumberLimit - this.getPipelineStateManager().getPipelines((ReplicationConfig)RatisReplicationConfig.getInstance((HddsProtos.ReplicationFactor)HddsProtos.ReplicationFactor.ONE)).size();
        }
        return false;
    }

    @VisibleForTesting
    public LeaderChoosePolicy getLeaderChoosePolicy() {
        return this.leaderChoosePolicy;
    }

    @Override
    public synchronized Pipeline create(RatisReplicationConfig replicationConfig) throws IOException {
        return this.create(replicationConfig, Collections.emptyList(), Collections.emptyList());
    }

    @Override
    public synchronized Pipeline create(RatisReplicationConfig replicationConfig, List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes) throws IOException {
        List<DatanodeDetails> dns;
        if (this.exceedPipelineNumberLimit(replicationConfig)) {
            throw new SCMException("Ratis pipeline number meets the limit: " + this.pipelineNumberLimit + " replicationConfig : " + replicationConfig, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
        }
        HddsProtos.ReplicationFactor factor = replicationConfig.getReplicationFactor();
        switch (factor) {
            case ONE: {
                dns = this.pickNodesNotUsed(replicationConfig, this.minRatisVolumeSizeBytes, this.containerSizeBytes);
                break;
            }
            case THREE: {
                List<DatanodeDetails> excludeDueToEngagement = this.filterPipelineEngagement();
                if (excludeDueToEngagement.size() > 0) {
                    if (excludedNodes.size() == 0) {
                        excludedNodes = excludeDueToEngagement;
                    } else {
                        excludedNodes.addAll(excludeDueToEngagement);
                    }
                }
                dns = this.placementPolicy.chooseDatanodes(excludedNodes, favoredNodes, factor.getNumber(), this.minRatisVolumeSizeBytes, this.containerSizeBytes);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown factor: " + factor.name());
            }
        }
        DatanodeDetails suggestedLeader = this.leaderChoosePolicy.chooseLeader(dns);
        Pipeline pipeline = Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.ALLOCATED).setReplicationConfig((ReplicationConfig)RatisReplicationConfig.getInstance((HddsProtos.ReplicationFactor)factor)).setNodes(dns).setSuggestedLeaderId(suggestedLeader != null ? suggestedLeader.getUuid() : null).build();
        CreatePipelineCommand createCommand = suggestedLeader != null ? new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns, suggestedLeader) : new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns);
        createCommand.setTerm(this.scmContext.getTermOfLeader());
        dns.forEach(node -> {
            LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}", (Object)pipeline.getId(), (Object)node.getUuidString());
            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(node.getUuid(), (SCMCommand)createCommand));
        });
        return pipeline;
    }

    @Override
    public Pipeline create(RatisReplicationConfig replicationConfig, List<DatanodeDetails> nodes) {
        return Pipeline.newBuilder().setId(PipelineID.randomId()).setState(Pipeline.PipelineState.ALLOCATED).setReplicationConfig((ReplicationConfig)replicationConfig).setNodes(nodes).build();
    }

    @Override
    public Pipeline createForRead(RatisReplicationConfig replicationConfig, Set<ContainerReplica> replicas) {
        return this.create(replicationConfig, replicas.stream().map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList()));
    }

    private List<DatanodeDetails> filterPipelineEngagement() {
        List<DatanodeDetails> healthyNodes = this.getNodeManager().getNodes(NodeStatus.inServiceHealthy());
        List<DatanodeDetails> excluded = healthyNodes.stream().map(d -> new PipelinePlacementPolicy.DnWithPipelines((DatanodeDetails)d, PipelinePlacementPolicy.currentRatisThreePipelineCount(this.getNodeManager(), this.getPipelineStateManager(), d))).filter(d -> d.getPipelines() >= this.getNodeManager().pipelineLimit(d.getDn())).sorted(Comparator.comparingInt(PipelinePlacementPolicy.DnWithPipelines::getPipelines)).map(d -> d.getDn()).collect(Collectors.toList());
        return excluded;
    }

    @Override
    public void shutdown() {
    }

    @Override
    public void close(Pipeline pipeline) throws NotLeaderException {
        ClosePipelineCommand closeCommand = new ClosePipelineCommand(pipeline.getId());
        closeCommand.setTerm(this.scmContext.getTermOfLeader());
        pipeline.getNodes().forEach(node -> {
            CommandForDatanode datanodeCommand = new CommandForDatanode(node.getUuid(), (SCMCommand)closeCommand);
            LOG.info("Send pipeline:{} close command to datanode {}", (Object)pipeline.getId(), (Object)datanodeCommand.getDatanodeId());
            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)datanodeCommand);
        });
    }
}

