/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.BackgroundPipelineCreator;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SCMPipelineManager
implements PipelineManager {
    private static final Logger LOG = LoggerFactory.getLogger(SCMPipelineManager.class);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private PipelineFactory pipelineFactory;
    private PipelineStateManager stateManager;
    private final BackgroundPipelineCreator backgroundPipelineCreator;
    private Scheduler scheduler;
    private MetadataStore pipelineStore;
    private final EventPublisher eventPublisher;
    private final NodeManager nodeManager;
    private final SCMPipelineMetrics metrics;
    private final Configuration conf;
    private long pipelineWaitDefaultTimeout;
    private ObjectName pmInfoBean;
    private final AtomicBoolean isInSafeMode;

    public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher) throws IOException {
        this(conf, nodeManager, eventPublisher, null, null);
        this.stateManager = new PipelineStateManager();
        this.pipelineFactory = new PipelineFactory(nodeManager, this.stateManager, conf, eventPublisher);
        this.initializePipelineState();
    }

    protected SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher, PipelineStateManager pipelineStateManager, PipelineFactory pipelineFactory) throws IOException {
        this.conf = conf;
        this.pipelineFactory = pipelineFactory;
        this.stateManager = pipelineStateManager;
        this.scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
        this.backgroundPipelineCreator = new BackgroundPipelineCreator(this, this.scheduler, conf);
        int cacheSize = conf.getInt("ozone.scm.db.cache.size.mb", 128);
        File pipelineDBPath = this.getPipelineDBPath(conf);
        this.pipelineStore = MetadataStoreBuilder.newBuilder().setCreateIfMissing(true).setConf(conf).setDbFile(pipelineDBPath).setCacheSize((long)cacheSize * 0x100000L).build();
        this.eventPublisher = eventPublisher;
        this.nodeManager = nodeManager;
        this.metrics = SCMPipelineMetrics.create();
        this.pmInfoBean = MBeans.register((String)"SCMPipelineManager", (String)"SCMPipelineManagerInfo", (Object)this);
        this.pipelineWaitDefaultTimeout = conf.getTimeDuration("hdds.pipeline.report.interval", "60s", TimeUnit.MILLISECONDS);
        this.isInSafeMode = new AtomicBoolean(conf.getBoolean("hdds.scm.safemode.enabled", true));
    }

    public PipelineStateManager getStateManager() {
        return this.stateManager;
    }

    @VisibleForTesting
    public void setPipelineProvider(HddsProtos.ReplicationType replicationType, PipelineProvider provider) {
        this.pipelineFactory.setProvider(replicationType, provider);
    }

    protected void initializePipelineState() throws IOException {
        if (this.pipelineStore.isEmpty()) {
            LOG.info("No pipeline exists in current db");
            return;
        }
        List pipelines = this.pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, (MetadataKeyFilters.MetadataKeyFilter[])null);
        for (Map.Entry entry : pipelines) {
            HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline.newBuilder((HddsProtos.Pipeline)((HddsProtos.Pipeline)HddsProtos.Pipeline.PARSER.parseFrom((byte[])entry.getValue())));
            Pipeline pipeline = Pipeline.getFromProtobuf((HddsProtos.Pipeline)pipelineBuilder.setState(HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
            pipeline.setCreationTimestamp(Instant.now());
            Preconditions.checkNotNull((Object)pipeline);
            this.stateManager.addPipeline(pipeline);
            this.nodeManager.addPipeline(pipeline);
        }
    }

    private void recordMetricsForPipeline(Pipeline pipeline) {
        this.metrics.incNumPipelineAllocated();
        if (pipeline.isOpen()) {
            this.metrics.incNumPipelineCreated();
            this.metrics.createPerPipelineMetrics(pipeline);
        }
        switch (pipeline.getType()) {
            case STAND_ALONE: {
                return;
            }
            case RATIS: {
                List<Pipeline> overlapPipelines = RatisPipelineUtils.checkPipelineContainSameDatanodes(this.stateManager, pipeline);
                if (!overlapPipelines.isEmpty()) {
                    this.metrics.incNumPipelineContainSameDatanodes();
                    for (Pipeline overlapPipeline : overlapPipelines) {
                        LOG.info("Pipeline: " + pipeline.getId().toString() + " contains same datanodes as previous pipelines: " + overlapPipeline.getId().toString() + " nodeIds: " + ((DatanodeDetails)pipeline.getNodes().get(0)).getUuid().toString() + ", " + ((DatanodeDetails)pipeline.getNodes().get(1)).getUuid().toString() + ", " + ((DatanodeDetails)pipeline.getNodes().get(2)).getUuid().toString());
                    }
                }
                return;
            }
        }
    }

    @Override
    public synchronized Pipeline createPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) throws IOException {
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.pipelineFactory.create(type, factor);
            this.pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), pipeline.getProtobufMessage().toByteArray());
            this.stateManager.addPipeline(pipeline);
            this.nodeManager.addPipeline(pipeline);
            this.recordMetricsForPipeline(pipeline);
            Pipeline pipeline2 = pipeline;
            return pipeline2;
        }
        catch (IOException ex) {
            this.metrics.incNumPipelineCreationFailed();
            throw ex;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Pipeline createPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, List<DatanodeDetails> nodes) {
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.pipelineFactory.create(type, factor, nodes);
            return pipeline;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
        this.lock.readLock().lock();
        try {
            Pipeline pipeline = this.stateManager.getPipeline(pipelineID);
            return pipeline;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean containsPipeline(PipelineID pipelineID) {
        this.lock.readLock().lock();
        try {
            this.getPipeline(pipelineID);
            boolean bl = true;
            return bl;
        }
        catch (PipelineNotFoundException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public List<Pipeline> getPipelines() {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines();
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, Pipeline.PipelineState state) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, state);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, Pipeline.PipelineState state) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor, state);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Pipeline> getPipelines(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns, Collection<PipelineID> excludePipelines) {
        this.lock.readLock().lock();
        try {
            List<Pipeline> list = this.stateManager.getPipelines(type, factor, state, excludeDns, excludePipelines);
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.addContainerToPipeline(pipelineID, containerID);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.removeContainerFromPipeline(pipelineID, containerID);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public NavigableSet<ContainerID> getContainersInPipeline(PipelineID pipelineID) throws IOException {
        this.lock.readLock().lock();
        try {
            NavigableSet<ContainerID> navigableSet = this.stateManager.getContainers(pipelineID);
            return navigableSet;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
        return this.stateManager.getNumberOfContainers(pipelineID);
    }

    @Override
    public void openPipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            Pipeline pipeline = this.stateManager.openPipeline(pipelineId);
            this.metrics.incNumPipelineCreated();
            this.metrics.createPerPipelineMetrics(pipeline);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) throws IOException {
        LOG.info("Destroying pipeline:{}", (Object)pipeline);
        this.finalizePipeline(pipeline.getId());
        if (onTimeout) {
            long pipelineDestroyTimeoutInMillis = this.conf.getTimeDuration("ozone.scm.pipeline.destroy.timeout", "66s", TimeUnit.MILLISECONDS);
            this.scheduler.schedule(() -> this.destroyPipeline(pipeline), pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String.format("Destroy pipeline failed for pipeline:%s", pipeline));
        } else {
            this.destroyPipeline(pipeline);
        }
    }

    @Override
    public void scrubPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) throws IOException {
        if (type != HddsProtos.ReplicationType.RATIS || factor != HddsProtos.ReplicationFactor.THREE) {
            return;
        }
        Instant currentTime = Instant.now();
        Long pipelineScrubTimeoutInMills = this.conf.getTimeDuration("ozone.scm.pipeline.allocated.timeout", "5m", TimeUnit.MILLISECONDS);
        List needToSrubPipelines = this.stateManager.getPipelines(type, factor, Pipeline.PipelineState.ALLOCATED).stream().filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp().toEpochMilli() >= pipelineScrubTimeoutInMills).collect(Collectors.toList());
        for (Pipeline p2 : needToSrubPipelines) {
            LOG.info("Scrubbing pipeline: id: " + p2.getId().toString() + " since it stays at ALLOCATED stage for " + Duration.between(currentTime, p2.getCreationTimestamp()).toMinutes() + " mins.");
            this.finalizeAndDestroyPipeline(p2, false);
        }
    }

    @Override
    public Map<String, Integer> getPipelineInfo() {
        HashMap<String, Integer> pipelineInfo = new HashMap<String, Integer>();
        for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
            pipelineInfo.put(state.toString(), 0);
        }
        this.stateManager.getPipelines().forEach(pipeline -> pipelineInfo.computeIfPresent(pipeline.getPipelineState().toString(), (k, v) -> v + 1));
        return pipelineInfo;
    }

    @Override
    public void startPipelineCreator() {
        this.backgroundPipelineCreator.startFixedIntervalPipelineCreator();
    }

    @Override
    public void triggerPipelineCreation() {
        this.backgroundPipelineCreator.triggerPipelineCreation();
    }

    @Override
    public void activatePipeline(PipelineID pipelineID) throws IOException {
        this.stateManager.activatePipeline(pipelineID);
    }

    @Override
    public void deactivatePipeline(PipelineID pipelineID) throws IOException {
        this.stateManager.deactivatePipeline(pipelineID);
    }

    @Override
    public void waitPipelineReady(PipelineID pipelineID, long timeout) throws IOException {
        boolean ready;
        long st = Time.monotonicNow();
        if (timeout == 0L) {
            timeout = this.pipelineWaitDefaultTimeout;
        }
        do {
            Pipeline pipeline;
            try {
                pipeline = this.stateManager.getPipeline(pipelineID);
            }
            catch (PipelineNotFoundException e) {
                throw new PipelineNotFoundException(String.format("Pipeline %s cannot be found", pipelineID));
            }
            ready = pipeline.isOpen();
            if (ready) continue;
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } while (!ready && Time.monotonicNow() - st < timeout);
        if (!ready) {
            throw new IOException(String.format("Pipeline %s is not ready in %d ms", pipelineID, timeout));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finalizePipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.stateManager.finalizePipeline(pipelineId);
            NavigableSet<ContainerID> containerIDs = this.stateManager.getContainers(pipelineId);
            for (ContainerID containerID : containerIDs) {
                this.eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, (Object)containerID);
            }
            this.metrics.removePipelineMetrics(pipelineId);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    protected void destroyPipeline(Pipeline pipeline) throws IOException {
        this.pipelineFactory.close(pipeline.getType(), pipeline);
        this.removePipeline(pipeline.getId());
        this.triggerPipelineCreation();
    }

    protected void removePipeline(PipelineID pipelineId) throws IOException {
        this.lock.writeLock().lock();
        try {
            this.pipelineStore.delete(pipelineId.getProtobuf().toByteArray());
            Pipeline pipeline = this.stateManager.removePipeline(pipelineId);
            this.nodeManager.removePipeline(pipeline);
            this.metrics.incNumPipelineDestroyed();
        }
        catch (IOException ex) {
            this.metrics.incNumPipelineDestroyFailed();
            throw ex;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void incNumBlocksAllocatedMetric(PipelineID id) {
        this.metrics.incNumBlocksAllocated(id);
    }

    @Override
    public void close() throws IOException {
        if (this.scheduler != null) {
            this.scheduler.close();
            this.scheduler = null;
        }
        if (this.pipelineStore != null) {
            this.pipelineStore.close();
            this.pipelineStore = null;
        }
        if (this.pmInfoBean != null) {
            MBeans.unregister((ObjectName)this.pmInfoBean);
            this.pmInfoBean = null;
        }
        if (this.metrics != null) {
            this.metrics.unRegister();
        }
        this.pipelineFactory.shutdown();
    }

    protected File getPipelineDBPath(Configuration configuration) {
        File metaDir = ServerUtils.getScmDbDir((Configuration)configuration);
        return new File(metaDir, "scm-pipeline.db");
    }

    protected ReadWriteLock getLock() {
        return this.lock;
    }

    @VisibleForTesting
    public PipelineFactory getPipelineFactory() {
        return this.pipelineFactory;
    }

    protected MetadataStore getPipelineStore() {
        return this.pipelineStore;
    }

    protected NodeManager getNodeManager() {
        return this.nodeManager;
    }

    @Override
    public void setSafeModeStatus(boolean safeModeStatus) {
        this.isInSafeMode.set(safeModeStatus);
    }

    @Override
    public boolean getSafeModeStatus() {
        return this.isInSafeMode.get();
    }
}

