/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.management.ObjectName;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.BackgroundPipelineCreator;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SCMPipelineManager
implements PipelineManager {
    private static final Logger LOG = LoggerFactory.getLogger(SCMPipelineManager.class);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private PipelineFactory pipelineFactory;
    private PipelineStateManager stateManager;
    private final BackgroundPipelineCreator backgroundPipelineCreator;
    private Scheduler scheduler;
    private final EventPublisher eventPublisher;
    private final NodeManager nodeManager;
    private final SCMPipelineMetrics metrics;
    private final ConfigurationSource conf;
    private long pipelineWaitDefaultTimeout;
    private ObjectName pmInfoBean;
    private Table<PipelineID, Pipeline> pipelineStore;
    private final AtomicBoolean isInSafeMode;
    private final AtomicBoolean pipelineCreationAllowed;

    public SCMPipelineManager(ConfigurationSource conf, NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore, EventPublisher eventPublisher) throws IOException {
        this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
        this.stateManager = new PipelineStateManager();
        this.pipelineFactory = new PipelineFactory(nodeManager, this.stateManager, conf, eventPublisher);
        this.pipelineStore = pipelineStore;
        this.initializePipelineState();
    }

    protected SCMPipelineManager(ConfigurationSource conf, NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore, EventPublisher eventPublisher, PipelineStateManager pipelineStateManager, PipelineFactory pipelineFactory) throws IOException {
        this.pipelineStore = pipelineStore;
        this.conf = conf;
        this.pipelineFactory = pipelineFactory;
        this.stateManager = pipelineStateManager;
        this.scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
        this.backgroundPipelineCreator = new BackgroundPipelineCreator(this, this.scheduler, conf);
        this.eventPublisher = eventPublisher;
        this.nodeManager = nodeManager;
        this.metrics = SCMPipelineMetrics.create();
        this.pmInfoBean = MBeans.register((String)"SCMPipelineManager", (String)"SCMPipelineManagerInfo", (Object)this);
        this.pipelineWaitDefaultTimeout = conf.getTimeDuration("hdds.pipeline.report.interval", "60s", TimeUnit.MILLISECONDS);
        this.isInSafeMode = new AtomicBoolean(conf.getBoolean("hdds.scm.safemode.enabled", true));
        this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
    }

    public PipelineStateManager getStateManager() {
        return this.stateManager;
    }

    @VisibleForTesting
    public void setPipelineProvider(HddsProtos.ReplicationType replicationType, PipelineProvider provider) {
        this.pipelineFactory.setProvider(replicationType, provider);
    }

    @VisibleForTesting
    public void allowPipelineCreation() {
        this.pipelineCreationAllowed.set(true);
    }

    @VisibleForTesting
    public boolean isPipelineCreationAllowed() {
        return this.pipelineCreationAllowed.get();
    }

    protected void initializePipelineState() throws IOException {
        if (this.pipelineStore.isEmpty()) {
            LOG.info("No pipeline exists in current db");
            return;
        }
        TableIterator iterator = this.pipelineStore.iterator();
        while (iterator.hasNext()) {
            Pipeline pipeline = this.nextPipelineFromIterator((TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>>)iterator);
            this.stateManager.addPipeline(pipeline);
            this.nodeManager.addPipeline(pipeline);
        }
    }

    private Pipeline nextPipelineFromIterator(TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>> it) throws IOException {
        Table.KeyValue actual = (Table.KeyValue)it.next();
        Pipeline pipeline = (Pipeline)actual.getValue();
        PipelineID pipelineID = (PipelineID)actual.getKey();
        this.checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
        return pipeline;
    }

    private void checkKeyAndReplaceIfObsolete(TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>> it, Pipeline pipeline, PipelineID pipelineID) {
        if (!pipelineID.equals((Object)pipeline.getId())) {
            try {
                LOG.info("Found pipeline in old format key : {}", (Object)pipeline.getId());
                it.removeFromDB();
                this.pipelineStore.put((Object)pipeline.getId(), (Object)pipeline);
            }
            catch (IOException e) {
                LOG.info("Pipeline table in RocksDB has an old key format, and removing the pipeline with the old key was unsuccessful.Pipeline: {}", (Object)pipeline);
            }
        }
    }

    private void recordMetricsForPipeline(Pipeline pipeline) {
        this.metrics.incNumPipelineAllocated();
        if (pipeline.isOpen()) {
            this.metrics.incNumPipelineCreated();
            this.metrics.createPerPipelineMetrics(pipeline);
        }
        switch (pipeline.getType()) {
            case STAND_ALONE: {
                return;
            }
            case RATIS: {
                List<Pipeline> overlapPipelines = RatisPipelineUtils.checkPipelineContainSameDatanodes(this.stateManager, pipeline);
                if (!overlapPipelines.isEmpty()) {
                    this.metrics.incNumPipelineContainSameDatanodes();
                    for (Pipeline overlapPipeline : overlapPipelines) {
                        LOG.info("Pipeline: " + pipeline.getId().toString() + " contains same datanodes as previous pipelines: " + overlapPipeline.getId().toString() + " nodeIds: " + ((DatanodeDetails)pipeline.getNodes().get(0)).getUuid().toString() + ", " + ((DatanodeDetails)pipeline.getNodes().get(1)).getUuid().toString() + ", " + ((DatanodeDetails)pipeline.getNodes().get(2)).getUuid().toString());
                    }
                }
                return;
            }
        }
    }

    @Override
    public Pipeline createPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) throws IOException {
        if (!this.isPipelineCreationAllowed() && factor != HddsProtos.ReplicationFactor.ONE) {
            LOG.debug("Pipeline creation is not allowed until safe mode prechecks complete");
            throw new IOException("Pipeline creation is not allowed as safe mode prechecks have not yet passed");
        }
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.pipelineFactory.create(type, factor);
            this.pipelineStore.put((Object)pipeline.getId(), (Object)pipeline);
            this.stateManager.addPipeline(pipeline);
            this.nodeManager.addPipeline(pipeline);
            this.recordMetricsForPipeline(pipeline);
            Pipeline pipeline2 = pipeline;
            return pipeline2;
        }
        catch (IOException ex) {
            if (ex instanceof SCMException && ((SCMException)ex).getResult() == SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) {
                LOG.debug("Can't create more pipelines of type {} and factor {}. Reason: {}", new Object[]{type, factor, ex.getMessage()});
            } else {
                LOG.error("Failed to create pipeline of type {} and factor {}. Exception: {}", new Object[]{type, factor, ex.getMessage()});
            }
            this.metrics.incNumPipelineCreationFailed();
            throw ex;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Pipeline createPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, List<DatanodeDetails> nodes) {
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.pipelineFactory.create(type, factor, nodes);
            return pipeline;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
        this.lock.readLock().lock();
        try {
            Pipeline pipeline = this.stateManager.getPipeline(pipelineID);
            return pipeline;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean containsPipeline(PipelineID pipelineID) {
        this.lock.readLock().lock();
        try {
            this.getPipeline(pipelineID);
            boolean bl = true;
            return bl;
        }
        catch (PipelineNotFoundException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public List<Pipeline> getPipelines() {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines();
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, Pipeline.PipelineState state) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, state);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, Pipeline.PipelineState state) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor, state);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns, Collection<PipelineID> excludePipelines) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor, state, excludeDns, excludePipelines);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.addContainerToPipeline(pipelineID, containerID);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.removeContainerFromPipeline(pipelineID, containerID);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public NavigableSet<ContainerID> getContainersInPipeline(PipelineID pipelineID) throws IOException {
        this.lock.readLock().lock();
        try {
            NavigableSet<ContainerID> navigableSet = this.stateManager.getContainers(pipelineID);
            return navigableSet;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
        return this.stateManager.getNumberOfContainers(pipelineID);
    }

    @Override
    public void openPipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.stateManager.openPipeline(pipelineId);
            this.metrics.incNumPipelineCreated();
            this.metrics.createPerPipelineMetrics(pipeline);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) throws IOException {
        LOG.info("Destroying pipeline:{}", (Object)pipeline);
        this.finalizePipeline(pipeline.getId());
        if (onTimeout) {
            long pipelineDestroyTimeoutInMillis = this.conf.getTimeDuration("ozone.scm.pipeline.destroy.timeout", "66s", TimeUnit.MILLISECONDS);
            this.scheduler.schedule(() -> this.destroyPipeline(pipeline), pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String.format("Destroy pipeline failed for pipeline:%s", pipeline));
        } else {
            this.destroyPipeline(pipeline);
        }
    }

    @Override
    public void scrubPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) throws IOException {
        if (type != HddsProtos.ReplicationType.RATIS || factor != HddsProtos.ReplicationFactor.THREE) {
            return;
        }
        Instant currentTime = Instant.now();
        Long pipelineScrubTimeoutInMills = this.conf.getTimeDuration("ozone.scm.pipeline.allocated.timeout", "5m", TimeUnit.MILLISECONDS);
        List needToSrubPipelines = this.stateManager.getPipelines(type, factor, Pipeline.PipelineState.ALLOCATED).stream().filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp().toEpochMilli() >= pipelineScrubTimeoutInMills).collect(Collectors.toList());
        for (Pipeline p2 : needToSrubPipelines) {
            LOG.info("Scrubbing pipeline: id: " + p2.getId().toString() + " since it stays at ALLOCATED stage for " + Duration.between(currentTime, p2.getCreationTimestamp()).toMinutes() + " mins.");
            this.finalizeAndDestroyPipeline(p2, false);
        }
    }

    @Override
    public Map<String, Integer> getPipelineInfo() {
        HashMap<String, Integer> pipelineInfo = new HashMap<String, Integer>();
        for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
            pipelineInfo.put(state.toString(), 0);
        }
        this.stateManager.getPipelines().forEach(pipeline -> pipelineInfo.computeIfPresent(pipeline.getPipelineState().toString(), (k, v) -> v + 1));
        return pipelineInfo;
    }

    @Override
    public void startPipelineCreator() {
        this.backgroundPipelineCreator.startFixedIntervalPipelineCreator();
    }

    @Override
    public void triggerPipelineCreation() {
        this.backgroundPipelineCreator.triggerPipelineCreation();
    }

    @Override
    public void activatePipeline(PipelineID pipelineID) throws IOException {
        this.stateManager.activatePipeline(pipelineID);
    }

    @Override
    public void deactivatePipeline(PipelineID pipelineID) throws IOException {
        this.stateManager.deactivatePipeline(pipelineID);
    }

    @Override
    public void waitPipelineReady(PipelineID pipelineID, long timeout) throws IOException {
        boolean ready;
        long st = Time.monotonicNow();
        if (timeout == 0L) {
            timeout = this.pipelineWaitDefaultTimeout;
        }
        do {
            Pipeline pipeline;
            try {
                pipeline = this.stateManager.getPipeline(pipelineID);
            }
            catch (PipelineNotFoundException e) {
                throw new PipelineNotFoundException(String.format("Pipeline %s cannot be found", pipelineID));
            }
            ready = pipeline.isOpen();
            if (ready) continue;
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } while (!ready && Time.monotonicNow() - st < timeout);
        if (!ready) {
            throw new IOException(String.format("Pipeline %s is not ready in %d ms", pipelineID, timeout));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finalizePipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.finalizePipeline(pipelineId);
            NavigableSet<ContainerID> containerIDs = this.stateManager.getContainers(pipelineId);
            for (ContainerID containerID : containerIDs) {
                this.eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, (Object)containerID);
            }
            this.metrics.removePipelineMetrics(pipelineId);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    protected void destroyPipeline(Pipeline pipeline) throws IOException {
        this.pipelineFactory.close(pipeline.getType(), pipeline);
        this.removePipeline(pipeline.getId());
        this.triggerPipelineCreation();
    }

    protected void removePipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            if (this.pipelineStore != null) {
                this.pipelineStore.delete((Object)pipelineId);
                Pipeline pipeline = this.stateManager.removePipeline(pipelineId);
                this.nodeManager.removePipeline(pipeline);
                this.metrics.incNumPipelineDestroyed();
            }
        }
        catch (IOException ex) {
            this.metrics.incNumPipelineDestroyFailed();
            throw ex;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void incNumBlocksAllocatedMetric(PipelineID id) {
        this.metrics.incNumBlocksAllocated(id);
    }

    @Override
    public void close() throws IOException {
        if (this.scheduler != null) {
            this.scheduler.close();
            this.scheduler = null;
        }
        if (this.pmInfoBean != null) {
            MBeans.unregister((ObjectName)this.pmInfoBean);
            this.pmInfoBean = null;
        }
        SCMPipelineMetrics.unRegister();
        this.pipelineFactory.shutdown();
    }

    protected ReadWriteLock getLock() {
        return this.lock;
    }

    @VisibleForTesting
    public PipelineFactory getPipelineFactory() {
        return this.pipelineFactory;
    }

    protected NodeManager getNodeManager() {
        return this.nodeManager;
    }

    @Override
    public boolean getSafeModeStatus() {
        return this.isInSafeMode.get();
    }

    public Table<PipelineID, Pipeline> getPipelineStore() {
        return this.pipelineStore;
    }

    public void onMessage(SCMSafeModeManager.SafeModeStatus status, EventPublisher publisher) {
        boolean currentAllowPipelines = this.pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
        boolean currentlyInSafeMode = this.isInSafeMode.getAndSet(status.isInSafeMode());
        if (this.isPipelineCreationAllowed() && !currentAllowPipelines) {
            this.triggerPipelineCreation();
        }
        if (!this.getSafeModeStatus() && currentlyInSafeMode) {
            this.startPipelineCreator();
        }
    }

    @VisibleForTesting
    protected static Logger getLog() {
        return LOG;
    }
}

