/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.container;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationManager
implements MetricsSource,
EventHandler<SCMSafeModeManager.SafeModeStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class);
    public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
    private final ContainerManager containerManager;
    private final PlacementPolicy containerPlacement;
    private final EventPublisher eventPublisher;
    private final LockManager<ContainerID> lockManager;
    private final Map<ContainerID, List<InflightAction>> inflightReplication;
    private final Map<ContainerID, List<InflightAction>> inflightDeletion;
    private final ReplicationManagerConfiguration conf;
    private Thread replicationMonitor;
    private volatile boolean running;
    private final NodeManager nodeManager;

    public ReplicationManager(ReplicationManagerConfiguration conf, ContainerManager containerManager, PlacementPolicy containerPlacement, EventPublisher eventPublisher, LockManager<ContainerID> lockManager, NodeManager nodeManager) {
        this.containerManager = containerManager;
        this.containerPlacement = containerPlacement;
        this.eventPublisher = eventPublisher;
        this.lockManager = lockManager;
        this.conf = conf;
        this.running = false;
        this.inflightReplication = new ConcurrentHashMap<ContainerID, List<InflightAction>>();
        this.inflightDeletion = new ConcurrentHashMap<ContainerID, List<InflightAction>>();
        this.nodeManager = nodeManager;
    }

    public synchronized void start() {
        if (!this.isRunning()) {
            DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, "SCM Replication manager (closed container replication) related metrics", (Object)this);
            LOG.info("Starting Replication Monitor Thread.");
            this.running = true;
            this.replicationMonitor = new Thread(this::run);
            this.replicationMonitor.setName("ReplicationMonitor");
            this.replicationMonitor.setDaemon(true);
            this.replicationMonitor.start();
        } else {
            LOG.info("Replication Monitor Thread is already running.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        if (!this.running) {
            ReplicationManager replicationManager = this;
            synchronized (replicationManager) {
                return this.replicationMonitor != null && this.replicationMonitor.isAlive();
            }
        }
        return true;
    }

    @VisibleForTesting
    @SuppressFBWarnings(value={"NN_NAKED_NOTIFY"}, justification="Used only for testing")
    public synchronized void processContainersNow() {
        this.notifyAll();
    }

    public synchronized void stop() {
        if (this.running) {
            LOG.info("Stopping Replication Monitor Thread.");
            this.inflightReplication.clear();
            this.inflightDeletion.clear();
            this.running = false;
            DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
            this.notifyAll();
        } else {
            LOG.info("Replication Monitor Thread is not running.");
        }
    }

    private synchronized void run() {
        try {
            while (this.running) {
                long start = Time.monotonicNow();
                Set<ContainerID> containerIds = this.containerManager.getContainerIDs();
                containerIds.forEach(this::processContainer);
                LOG.info("Replication Monitor Thread took {} milliseconds for processing {} containers.", (Object)(Time.monotonicNow() - start), (Object)containerIds.size());
                this.wait(this.conf.getInterval());
            }
        }
        catch (Throwable t) {
            LOG.error("Exception in Replication Monitor Thread.", t);
            ExitUtil.terminate((int)1, (Throwable)t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processContainer(ContainerID id) {
        this.lockManager.lock((Object)id);
        try {
            ContainerInfo container = this.containerManager.getContainer(id);
            Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(container.containerID());
            HddsProtos.LifeCycleState state = container.getState();
            if (state == HddsProtos.LifeCycleState.OPEN) {
                if (!this.isContainerHealthy(container, replicas)) {
                    this.eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, (Object)id);
                }
                return;
            }
            if (state == HddsProtos.LifeCycleState.CLOSING) {
                replicas.forEach(replica -> this.sendCloseCommand(container, replica.getDatanodeDetails(), false));
                return;
            }
            if (state == HddsProtos.LifeCycleState.QUASI_CLOSED && this.canForceCloseContainer(container, replicas)) {
                this.forceCloseContainer(container, replicas);
                return;
            }
            this.updateInflightAction(container, this.inflightReplication, action -> replicas.stream().anyMatch(r -> r.getDatanodeDetails().equals((Object)((InflightAction)action).datanode)));
            this.updateInflightAction(container, this.inflightDeletion, action -> replicas.stream().noneMatch(r -> r.getDatanodeDetails().equals((Object)((InflightAction)action).datanode)));
            if (this.isContainerHealthy(container, replicas)) {
                return;
            }
            if (this.isContainerUnderReplicated(container, replicas)) {
                this.handleUnderReplicatedContainer(container, replicas);
                return;
            }
            if (this.isContainerOverReplicated(container, replicas)) {
                this.handleOverReplicatedContainer(container, replicas);
                return;
            }
            this.handleUnstableContainer(container, replicas);
        }
        catch (ContainerNotFoundException ex) {
            LOG.warn("Missing container {}.", (Object)id);
        }
        catch (Exception ex) {
            LOG.warn("Process container {} error: ", (Object)id, (Object)ex);
        }
        finally {
            this.lockManager.unlock((Object)id);
        }
    }

    private void updateInflightAction(ContainerInfo container, Map<ContainerID, List<InflightAction>> inflightActions, Predicate<InflightAction> filter) {
        ContainerID id = container.containerID();
        long deadline = Time.monotonicNow() - this.conf.getEventTimeout();
        if (inflightActions.containsKey(id)) {
            List<InflightAction> actions = inflightActions.get(id);
            actions.removeIf(action -> this.nodeManager.getNodeState(((InflightAction)action).datanode) != HddsProtos.NodeState.HEALTHY);
            actions.removeIf(action -> ((InflightAction)action).time < deadline);
            actions.removeIf(filter);
            if (actions.isEmpty()) {
                inflightActions.remove(id);
            }
        }
    }

    private boolean isContainerHealthy(ContainerInfo container, Set<ContainerReplica> replicas) {
        return !this.isContainerUnderReplicated(container, replicas) && !this.isContainerOverReplicated(container, replicas) && replicas.stream().allMatch(r -> ReplicationManager.compareState(container.getState(), r.getState()));
    }

    private boolean isContainerUnderReplicated(ContainerInfo container, Set<ContainerReplica> replicas) {
        boolean misReplicated = !this.getPlacementStatus(replicas, container.getReplicationFactor().getNumber()).isPolicySatisfied();
        return container.getReplicationFactor().getNumber() > this.getReplicaCount(container.containerID(), replicas) || misReplicated;
    }

    private boolean isContainerOverReplicated(ContainerInfo container, Set<ContainerReplica> replicas) {
        return container.getReplicationFactor().getNumber() < this.getReplicaCount(container.containerID(), replicas);
    }

    private int getReplicaCount(ContainerID id, Set<ContainerReplica> replicas) {
        return replicas.size() + this.inflightReplication.getOrDefault(id, Collections.emptyList()).size() - this.inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
    }

    private boolean canForceCloseContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        Preconditions.assertTrue((container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED ? 1 : 0) != 0);
        int replicationFactor = container.getReplicationFactor().getNumber();
        long uniqueQuasiClosedReplicaCount = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED).map(ContainerReplica::getOriginDatanodeId).distinct().count();
        return uniqueQuasiClosedReplicaCount > (long)(replicationFactor / 2);
    }

    private void forceCloseContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        Preconditions.assertTrue((container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED ? 1 : 0) != 0);
        List quasiClosedReplicas = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED).collect(Collectors.toList());
        Long sequenceId = quasiClosedReplicas.stream().map(ContainerReplica::getSequenceId).max(Long::compare).orElse(-1L);
        LOG.info("Force closing container {} with BCSID {}, which is in QUASI_CLOSED state.", (Object)container.containerID(), (Object)sequenceId);
        quasiClosedReplicas.stream().filter(r -> sequenceId != -1L).filter(replica -> replica.getSequenceId().equals(sequenceId)).forEach(replica -> this.sendCloseCommand(container, replica.getDatanodeDetails(), true));
    }

    private void handleUnderReplicatedContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        LOG.debug("Handling under-replicated container: {}", (Object)container.getContainerID());
        try {
            ContainerID id = container.containerID();
            List deletionInFlight = this.inflightDeletion.getOrDefault(id, Collections.emptyList()).stream().map(action -> ((InflightAction)action).datanode).collect(Collectors.toList());
            List replicationInFlight = this.inflightReplication.getOrDefault(id, Collections.emptyList()).stream().map(action -> ((InflightAction)action).datanode).collect(Collectors.toList());
            List<DatanodeDetails> source = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED || r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED).filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())).sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())).map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList());
            if (source.size() > 0) {
                int replicasNeeded;
                int replicationFactor = container.getReplicationFactor().getNumber();
                ArrayList targetReplicas = new ArrayList(source);
                targetReplicas.addAll(replicationInFlight);
                ContainerPlacementStatus placementStatus = this.containerPlacement.validateContainerPlacement(targetReplicas, replicationFactor);
                int delta = replicationFactor - this.getReplicaCount(id, replicas);
                int misRepDelta = placementStatus.misReplicationCount();
                int n = replicasNeeded = delta < misRepDelta ? misRepDelta : delta;
                if (replicasNeeded <= 0) {
                    LOG.debug("Container {} meets replication requirement with inflight replicas", (Object)id);
                    return;
                }
                List excludeList = replicas.stream().map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList());
                excludeList.addAll(replicationInFlight);
                List selectedDatanodes = this.containerPlacement.chooseDatanodes(excludeList, null, replicasNeeded, container.getUsedBytes());
                if (delta > 0) {
                    LOG.info("Container {} is under replicated. Expected replica count is {}, but found {}.", new Object[]{id, replicationFactor, replicationFactor - delta});
                }
                int newMisRepDelta = misRepDelta;
                if (misRepDelta > 0) {
                    LOG.info("Container: {}. {}", (Object)id, (Object)placementStatus.misReplicatedReason());
                    targetReplicas.addAll(selectedDatanodes);
                    newMisRepDelta = this.containerPlacement.validateContainerPlacement(targetReplicas, replicationFactor).misReplicationCount();
                }
                if (delta > 0 || newMisRepDelta < misRepDelta) {
                    for (DatanodeDetails datanode : selectedDatanodes) {
                        this.sendReplicateCommand(container, datanode, source);
                    }
                } else {
                    LOG.warn("Container {} is mis-replicated, requiring {} additional replicas. After selecting new nodes, mis-replication has not improved. No additional replicas will be scheduled", (Object)id, (Object)misRepDelta);
                }
            } else {
                LOG.warn("Cannot replicate container {}, no healthy replica found.", (Object)container.containerID());
            }
        }
        catch (IOException ex) {
            LOG.warn("Exception while replicating container {}.", (Object)container.getContainerID(), (Object)ex);
        }
    }

    private void handleOverReplicatedContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        ContainerID id = container.containerID();
        int replicationFactor = container.getReplicationFactor().getNumber();
        int excess = replicas.size() - replicationFactor - this.inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
        if (excess > 0) {
            LOG.info("Container {} is over replicated. Expected replica count is {}, but found {}.", new Object[]{id, replicationFactor, replicationFactor + excess});
            LinkedHashMap uniqueReplicas = new LinkedHashMap();
            replicas.stream().filter(r -> ReplicationManager.compareState(container.getState(), r.getState())).forEach(r -> uniqueReplicas.putIfAbsent(r.getOriginDatanodeId(), r));
            ArrayList<ContainerReplica> eligibleReplicas = new ArrayList<ContainerReplica>(replicas);
            eligibleReplicas.removeAll(uniqueReplicas.values());
            List unhealthyReplicas = eligibleReplicas.stream().filter(r -> !ReplicationManager.compareState(container.getState(), r.getState())).collect(Collectors.toList());
            Iterator iterator = unhealthyReplicas.iterator();
            if (iterator.hasNext()) {
                ContainerReplica r2 = (ContainerReplica)iterator.next();
                if (excess > 0) {
                    this.sendDeleteCommand(container, r2.getDatanodeDetails(), true);
                    --excess;
                }
            }
            if (excess > 0) {
                eligibleReplicas.removeAll(unhealthyReplicas);
                HashSet<ContainerReplica> replicaSet = new HashSet<ContainerReplica>(eligibleReplicas);
                boolean misReplicated = this.getPlacementStatus(replicaSet, replicationFactor).isPolicySatisfied();
                for (ContainerReplica r3 : eligibleReplicas) {
                    if (excess <= 0) break;
                    replicaSet.remove(r3);
                    boolean nowMisRep = this.getPlacementStatus(replicaSet, replicationFactor).isPolicySatisfied();
                    if (misReplicated || !nowMisRep) {
                        this.sendDeleteCommand(container, r3.getDatanodeDetails(), true);
                        --excess;
                        continue;
                    }
                    replicaSet.add(r3);
                }
                if (excess > 0) {
                    LOG.info("The container {} is over replicated with {} excess replica. The excess replicas cannot be removed without violating the placement policy", (Object)container, (Object)excess);
                }
            }
        }
    }

    private ContainerPlacementStatus getPlacementStatus(Set<ContainerReplica> replicas, int replicationFactor) {
        List replicaDns = replicas.stream().map(c -> c.getDatanodeDetails()).collect(Collectors.toList());
        return this.containerPlacement.validateContainerPlacement(replicaDns, replicationFactor);
    }

    private void handleUnstableContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        List unhealthyReplicas = replicas.stream().filter(r -> !ReplicationManager.compareState(container.getState(), r.getState())).collect(Collectors.toList());
        Iterator iterator = unhealthyReplicas.iterator();
        while (iterator.hasNext()) {
            ContainerReplica replica2 = (ContainerReplica)iterator.next();
            StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State state = replica2.getState();
            if (state == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN || state == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSING) {
                this.sendCloseCommand(container, replica2.getDatanodeDetails(), false);
                iterator.remove();
            }
            if (state != StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED || container.getSequenceId() != replica2.getSequenceId().longValue()) continue;
            this.sendCloseCommand(container, replica2.getDatanodeDetails(), true);
            iterator.remove();
        }
        unhealthyReplicas.stream().findFirst().ifPresent(replica -> this.sendDeleteCommand(container, replica.getDatanodeDetails(), false));
    }

    private void sendCloseCommand(ContainerInfo container, DatanodeDetails datanode, boolean force) {
        LOG.info("Sending close container command for container {} to datanode {}.", (Object)container.containerID(), (Object)datanode);
        CloseContainerCommand closeContainerCommand = new CloseContainerCommand(container.getContainerID(), container.getPipelineID(), force);
        this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(datanode.getUuid(), (SCMCommand)closeContainerCommand));
    }

    private void sendReplicateCommand(ContainerInfo container, DatanodeDetails datanode, List<DatanodeDetails> sources) {
        LOG.info("Sending replicate container command for container {} to datanode {}", (Object)container.containerID(), (Object)datanode);
        ContainerID id = container.containerID();
        ReplicateContainerCommand replicateCommand = new ReplicateContainerCommand(id.getId(), sources);
        this.inflightReplication.computeIfAbsent(id, k -> new ArrayList());
        this.sendAndTrackDatanodeCommand(datanode, (SCMCommand)replicateCommand, action -> this.inflightReplication.get(id).add((InflightAction)action));
    }

    private void sendDeleteCommand(ContainerInfo container, DatanodeDetails datanode, boolean force) {
        LOG.info("Sending delete container command for container {} to datanode {}", (Object)container.containerID(), (Object)datanode);
        ContainerID id = container.containerID();
        DeleteContainerCommand deleteCommand = new DeleteContainerCommand(id.getId(), force);
        this.inflightDeletion.computeIfAbsent(id, k -> new ArrayList());
        this.sendAndTrackDatanodeCommand(datanode, (SCMCommand)deleteCommand, action -> this.inflightDeletion.get(id).add((InflightAction)action));
    }

    private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(DatanodeDetails datanode, SCMCommand<T> command, Consumer<InflightAction> tracker) {
        CommandForDatanode datanodeCommand = new CommandForDatanode(datanode.getUuid(), command);
        this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)datanodeCommand);
        tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
    }

    private static boolean compareState(HddsProtos.LifeCycleState containerState, StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State replicaState) {
        switch (containerState) {
            case OPEN: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
            }
            case CLOSING: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSING;
            }
            case QUASI_CLOSED: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED;
            }
            case CLOSED: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
            }
            case DELETING: {
                return false;
            }
            case DELETED: {
                return false;
            }
        }
        return false;
    }

    public void getMetrics(MetricsCollector collector, boolean all) {
        collector.addRecord(ReplicationManager.class.getSimpleName()).addGauge((MetricsInfo)ReplicationManagerMetrics.INFLIGHT_REPLICATION, this.inflightReplication.size()).addGauge((MetricsInfo)ReplicationManagerMetrics.INFLIGHT_DELETION, this.inflightDeletion.size()).endRecord();
    }

    public void onMessage(SCMSafeModeManager.SafeModeStatus status, EventPublisher publisher) {
        if (!status.isInSafeMode() && !this.isRunning()) {
            this.start();
        }
    }

    public static enum ReplicationManagerMetrics implements MetricsInfo
    {
        INFLIGHT_REPLICATION("Tracked inflight container replication requests."),
        INFLIGHT_DELETION("Tracked inflight container deletion requests.");

        private final String desc;

        private ReplicationManagerMetrics(String desc) {
            this.desc = desc;
        }

        public String description() {
            return this.desc;
        }

        public String toString() {
            return new StringJoiner(", ", ((Object)((Object)this)).getClass().getSimpleName() + "{", "}").add("name=" + this.name()).add("description=" + this.desc).toString();
        }
    }

    @ConfigGroup(prefix="hdds.scm.replication")
    public static class ReplicationManagerConfiguration {
        @Config(key="thread.interval", type=ConfigType.TIME, defaultValue="300s", tags={ConfigTag.SCM, ConfigTag.OZONE}, description="There is a replication monitor thread running inside SCM which takes care of replicating the containers in the cluster. This property is used to configure the interval in which that thread runs.")
        private long interval = Duration.ofSeconds(300L).toMillis();
        @Config(key="event.timeout", type=ConfigType.TIME, defaultValue="30m", tags={ConfigTag.SCM, ConfigTag.OZONE}, description="Timeout for the container replication/deletion commands sent  to datanodes. After this timeout the command will be retried.")
        private long eventTimeout = Duration.ofMinutes(30L).toMillis();

        public void setInterval(Duration interval) {
            this.interval = interval.toMillis();
        }

        public void setEventTimeout(Duration timeout) {
            this.eventTimeout = timeout.toMillis();
        }

        public long getInterval() {
            return this.interval;
        }

        public long getEventTimeout() {
            return this.eventTimeout;
        }
    }

    private static final class InflightAction {
        private final DatanodeDetails datanode;
        private final long time;

        private InflightAction(DatanodeDetails datanode, long time) {
            this.datanode = datanode;
            this.time = time;
        }
    }
}

