/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.test;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.TestLoggers;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator;
import io.camunda.zeebe.broker.test.TestActorSchedulerFactory;
import io.camunda.zeebe.broker.test.TestBrokerClientFactory;
import io.camunda.zeebe.broker.test.TestClusterFactory;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.test.util.TestConfigurationFactory;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.allocation.DirectBufferAllocator;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.util.NetUtil;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.util.Files;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;

public final class EmbeddedBrokerRule
extends ExternalResource {
    public static final String DEFAULT_CONFIG_FILE = "zeebe.test.cfg.yaml";
    public static final int INSTALL_TIMEOUT = 5;
    public static final TimeUnit INSTALL_TIMEOUT_UNIT = TimeUnit.MINUTES;
    protected static final Logger LOG = TestLoggers.TEST_LOGGER;
    private static final boolean ENABLE_DEBUG_EXPORTER = false;
    private static final boolean ENABLE_HTTP_EXPORTER = false;
    private static final String SNAPSHOTS_DIRECTORY = "snapshots";
    private static final String STATE_DIRECTORY = "state";
    protected final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    protected final Supplier<InputStream> configSupplier;
    protected final Consumer<BrokerCfg>[] configurators;
    protected BrokerCfg brokerCfg;
    protected Broker broker;
    protected final ControlledActorClock controlledActorClock = new ControlledActorClock();
    protected final SpringBrokerBridge springBrokerBridge = new SpringBrokerBridge();
    protected long startTime;
    private AtomixCluster atomixCluster;
    private File brokerBase;
    private String dataDirectory;
    private SystemContext systemContext;
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();

    @SafeVarargs
    public EmbeddedBrokerRule(Consumer<BrokerCfg> ... configurators) {
        this(DEFAULT_CONFIG_FILE, configurators);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(String configFileClasspathLocation, Consumer<BrokerCfg> ... configurators) {
        this(() -> EmbeddedBrokerRule.class.getClassLoader().getResourceAsStream(configFileClasspathLocation), configurators);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(Supplier<InputStream> configSupplier, Consumer<BrokerCfg> ... configurators) {
        this.configSupplier = configSupplier;
        this.configurators = configurators;
    }

    private static void deleteSnapshots(File parentDir) {
        File snapshotDirectory = new File(parentDir, SNAPSHOTS_DIRECTORY);
        if (snapshotDirectory.exists()) {
            try {
                FileUtil.deleteFolder((String)snapshotDirectory.getAbsolutePath());
            }
            catch (IOException e) {
                throw new RuntimeException("Could not delete snapshot directory " + snapshotDirectory.getAbsolutePath(), e);
            }
        }
    }

    public static void assignSocketAddresses(BrokerCfg brokerCfg) {
        EmbeddedBrokerConfigurator.setGatewayApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setGatewayClusterPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setCommandApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
        EmbeddedBrokerConfigurator.setInternalApiPort(SocketUtil.getNextAddress().getPort()).accept(brokerCfg);
    }

    public Statement apply(Statement base, Description description) {
        Statement statement = this.recordingExporterTestWatcher.apply(base, description);
        return super.apply(statement, description);
    }

    public void before() {
        this.brokerBase = Files.newTemporaryFolder();
        this.startTime = System.currentTimeMillis();
        this.startBroker(new PartitionListener[0]);
        LOG.info("\n====\nBroker startup time: {}\n====\n", (Object)(System.currentTimeMillis() - this.startTime));
        this.startTime = System.currentTimeMillis();
    }

    public void after() {
        try {
            LOG.info("Test execution time: " + (System.currentTimeMillis() - this.startTime));
            this.startTime = System.currentTimeMillis();
            this.stopBroker();
            LOG.info("Broker closing time: " + (System.currentTimeMillis() - this.startTime));
            long allocatedMemoryInKb = DirectBufferAllocator.getAllocatedMemoryInKb();
            if (allocatedMemoryInKb > 0L) {
                LOG.warn("There are still allocated direct buffers of a total size of {}kB.", (Object)allocatedMemoryInKb);
            }
        }
        finally {
            try {
                FileUtil.deleteFolder((String)this.brokerBase.getAbsolutePath());
            }
            catch (IOException e) {
                LOG.error("Unexpected error on deleting data.", (Throwable)e);
            }
            this.meterRegistry.clear();
            this.meterRegistry.close();
            this.controlledActorClock.reset();
        }
    }

    public BrokerCfg getBrokerCfg() {
        return this.brokerCfg;
    }

    public SpringBrokerBridge getSpringBrokerBridge() {
        return this.springBrokerBridge;
    }

    public ClusterServices getClusterServices() {
        return this.broker.getBrokerContext().getClusterServices();
    }

    public AtomixCluster getAtomixCluster() {
        return this.atomixCluster;
    }

    public InetSocketAddress getGatewayAddress() {
        return this.brokerCfg.getGateway().getNetwork().toSocketAddress();
    }

    public Broker getBroker() {
        return this.broker;
    }

    public ControlledActorClock getClock() {
        return this.controlledActorClock;
    }

    public void restartBroker(PartitionListener ... listeners) {
        this.stopBroker();
        this.startBroker(listeners);
    }

    public void stopBroker() {
        if (this.broker != null) {
            this.broker.close();
            this.broker = null;
            try {
                this.systemContext.getScheduler().stop().get();
            }
            catch (InterruptedException | ExecutionException e) {
                LangUtil.rethrowUnchecked((Throwable)e);
            }
            this.systemContext = null;
            System.gc();
        }
    }

    public Path getBrokerBase() {
        return this.brokerBase.toPath();
    }

    public void startBroker(PartitionListener ... listeners) {
        if (this.brokerCfg == null) {
            try (InputStream configStream = this.configSupplier.get();){
                this.brokerCfg = configStream == null ? new BrokerCfg() : (BrokerCfg)new TestConfigurationFactory().create(null, "zeebe.broker", configStream, BrokerCfg.class);
                this.configureBroker(this.brokerCfg);
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to open configuration", e);
            }
        }
        ActorScheduler scheduler = TestActorSchedulerFactory.ofBrokerConfig(this.brokerCfg, (ActorClock)this.controlledActorClock);
        this.atomixCluster = TestClusterFactory.createAtomixCluster(this.brokerCfg, this.meterRegistry);
        this.systemContext = new SystemContext(this.brokerCfg, scheduler, this.atomixCluster, TestBrokerClientFactory.createBrokerClient(this.atomixCluster, scheduler));
        ArrayList<PartitionListener> additionalListeners = new ArrayList<PartitionListener>(Arrays.asList(listeners));
        CountDownLatch latch = new CountDownLatch(this.brokerCfg.getCluster().getPartitionsCount());
        additionalListeners.add(new LeaderPartitionListener(latch));
        this.broker = new Broker(this.systemContext, this.springBrokerBridge, additionalListeners);
        this.broker.start().join();
        try {
            latch.await(5L, INSTALL_TIMEOUT_UNIT);
        }
        catch (InterruptedException e) {
            LOG.info("Broker was not started in 15 seconds", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        if (this.brokerCfg.getGateway().isEnable()) {
            try (ZeebeClient client = ZeebeClient.newClientBuilder().gatewayAddress(NetUtil.toSocketAddressString((InetSocketAddress)this.getGatewayAddress())).usePlaintext().build();){
                Awaitility.await((String)"until we have a complete topology").ignoreExceptions().untilAsserted(() -> {
                    Topology topology = (Topology)client.newTopologyRequest().send().join();
                    TopologyAssert.assertThat((Topology)topology).isComplete(this.brokerCfg.getCluster().getClusterSize(), this.brokerCfg.getCluster().getPartitionsCount(), this.brokerCfg.getCluster().getReplicationFactor());
                });
            }
        }
        this.dataDirectory = this.broker.getSystemContext().getBrokerConfiguration().getData().getDirectory();
    }

    public void configureBroker(BrokerCfg brokerCfg) {
        EmbeddedBrokerConfigurator.TEST_RECORDER.accept(brokerCfg);
        for (Consumer<BrokerCfg> configurator : this.configurators) {
            configurator.accept(brokerCfg);
        }
        EmbeddedBrokerRule.assignSocketAddresses(brokerCfg);
        brokerCfg.init(this.brokerBase.getAbsolutePath());
    }

    public void purgeSnapshots() {
        File directory = new File(this.dataDirectory);
        File[] partitionDirectories = directory.listFiles((d, f) -> new File(d, f).isDirectory());
        if (partitionDirectories == null) {
            return;
        }
        for (File partitionDirectory : partitionDirectories) {
            File stateDirectory = new File(partitionDirectory, STATE_DIRECTORY);
            if (!stateDirectory.exists()) continue;
            EmbeddedBrokerRule.deleteSnapshots(stateDirectory);
        }
    }

    public MeterRegistry getMeterRegistry() {
        return this.meterRegistry;
    }

    private static class LeaderPartitionListener
    implements PartitionListener {
        private final CountDownLatch latch;

        LeaderPartitionListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
            return CompletableActorFuture.completed(null);
        }

        public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream, QueryService queryService) {
            this.latch.countDown();
            return CompletableActorFuture.completed(null);
        }

        public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
            return CompletableActorFuture.completed(null);
        }
    }
}

