/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.test;

import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.broker.system.configuration.NetworkCfg;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.test.util.TestConfigurationFactory;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.allocation.DirectBufferAllocator;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Files;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedBrokerRule
extends ExternalResource {
    public static final String DEFAULT_CONFIG_FILE = "zeebe.test.cfg.yaml";
    public static final int DEFAULT_TIMEOUT = 25;
    public static final String TEST_RECORD_EXPORTER_ID = "test-recorder";
    protected static final Logger LOG = LoggerFactory.getLogger((String)"io.camunda.zeebe.test");
    private static final String SNAPSHOTS_DIRECTORY = "snapshots";
    private static final String STATE_DIRECTORY = "state";
    protected final Supplier<InputStream> configSupplier;
    protected long startTime;
    private final Consumer<BrokerCfg>[] configurators;
    private final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private final BrokerCfg brokerCfg;
    private Broker broker;
    private final ControlledActorClock controlledActorClock = new ControlledActorClock();
    private final SpringBrokerBridge springBrokerBridge = new SpringBrokerBridge();
    private final Duration timeout;
    private final File newTemporaryFolder;
    private String dataDirectory;
    private SystemContext systemContext;

    @SafeVarargs
    public EmbeddedBrokerRule(Consumer<BrokerCfg> ... configurators) {
        this(DEFAULT_CONFIG_FILE, configurators);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(String configFileClasspathLocation, Consumer<BrokerCfg> ... configurators) {
        this(() -> EmbeddedBrokerRule.class.getClassLoader().getResourceAsStream(configFileClasspathLocation), 25, configurators);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(Supplier<InputStream> configSupplier, int timeout, Consumer<BrokerCfg> ... configurators) {
        this(configSupplier, Duration.ofSeconds(timeout), configurators);
    }

    @SafeVarargs
    public EmbeddedBrokerRule(Supplier<InputStream> configSupplier, Duration timeout, Consumer<BrokerCfg> ... configurators) {
        this.configSupplier = configSupplier;
        this.configurators = configurators;
        this.timeout = timeout;
        this.newTemporaryFolder = Files.newTemporaryFolder();
        try (InputStream configStream = configSupplier.get();){
            this.brokerCfg = configStream == null ? new BrokerCfg() : (BrokerCfg)new TestConfigurationFactory().create(null, "zeebe.broker", configStream, BrokerCfg.class);
            this.configureBroker(this.brokerCfg);
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to open configuration", e);
        }
    }

    private static void deleteSnapshots(File parentDir) {
        File snapshotDirectory = new File(parentDir, SNAPSHOTS_DIRECTORY);
        if (snapshotDirectory.exists()) {
            try {
                FileUtil.deleteFolder((String)snapshotDirectory.getAbsolutePath());
            }
            catch (IOException e) {
                throw new RuntimeException("Could not delete snapshot directory " + snapshotDirectory.getAbsolutePath(), e);
            }
        }
    }

    public static void assignSocketAddresses(BrokerCfg brokerCfg) {
        NetworkCfg network = brokerCfg.getNetwork();
        brokerCfg.getGateway().getNetwork().setPort(SocketUtil.getNextAddress().getPort());
        network.getCommandApi().setPort(SocketUtil.getNextAddress().getPort());
        network.getInternalApi().setPort(SocketUtil.getNextAddress().getPort());
        network.getMonitoringApi().setPort(SocketUtil.getNextAddress().getPort());
    }

    public Statement apply(Statement base, Description description) {
        Statement statement = this.recordingExporterTestWatcher.apply(base, description);
        return super.apply(statement, description);
    }

    public void before() {
        this.startTime = System.currentTimeMillis();
        this.startBroker();
        LOG.info("\n====\nBroker startup time: {}\n====\n", (Object)(System.currentTimeMillis() - this.startTime));
        this.startTime = System.currentTimeMillis();
    }

    public void after() {
        try {
            LOG.info("Test execution time: " + (System.currentTimeMillis() - this.startTime));
            this.startTime = System.currentTimeMillis();
            this.stopBroker();
            LOG.info("Broker closing time: " + (System.currentTimeMillis() - this.startTime));
            long allocatedMemoryInKb = DirectBufferAllocator.getAllocatedMemoryInKb();
            if (allocatedMemoryInKb > 0L) {
                LOG.warn("There are still allocated direct buffers of a total size of {}kB.", (Object)allocatedMemoryInKb);
            }
        }
        finally {
            try {
                FileUtil.deleteFolder((String)this.newTemporaryFolder.getAbsolutePath());
            }
            catch (IOException e) {
                LOG.error("Unexpected error on deleting data.", (Throwable)e);
            }
            this.controlledActorClock.reset();
        }
    }

    public BrokerCfg getBrokerCfg() {
        return this.brokerCfg;
    }

    public InetSocketAddress getGatewayAddress() {
        return this.brokerCfg.getGateway().getNetwork().toSocketAddress();
    }

    public ControlledActorClock getClock() {
        return this.controlledActorClock;
    }

    public void restartBroker() {
        this.stopBroker();
        this.startBroker();
    }

    public void stopBroker() {
        if (this.broker != null) {
            this.broker.close();
            this.broker = null;
            try {
                this.systemContext.getScheduler().stop().get();
            }
            catch (InterruptedException | ExecutionException e) {
                LangUtil.rethrowUnchecked((Throwable)e);
            }
            this.systemContext = null;
            System.gc();
        }
    }

    public void startBroker() {
        this.systemContext = new SystemContext(this.brokerCfg, this.newTemporaryFolder.getAbsolutePath(), (ActorClock)this.controlledActorClock);
        this.systemContext.getScheduler().start();
        this.broker = new Broker(this.systemContext, this.springBrokerBridge);
        CountDownLatch latch = new CountDownLatch(this.brokerCfg.getCluster().getPartitionsCount());
        this.broker.addPartitionListener((PartitionListener)new LeaderPartitionListener(latch));
        this.broker.start().join();
        try {
            boolean hasLeaderPartition = latch.await(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)hasLeaderPartition).describedAs("Expected the broker to have a leader of the partition within %s", new Object[]{this.timeout})).isTrue();
        }
        catch (InterruptedException e) {
            LOG.info("Timeout. Broker was not started within {}", (Object)this.timeout, (Object)e);
            Thread.currentThread().interrupt();
        }
        EmbeddedGatewayService embeddedGatewayService = this.broker.getEmbeddedGatewayService();
        if (embeddedGatewayService != null) {
            BrokerClient brokerClient = embeddedGatewayService.get().getBrokerClient();
            TestUtil.waitUntil(() -> {
                BrokerTopologyManager topologyManager = brokerClient.getTopologyManager();
                BrokerClusterState topology = topologyManager.getTopology();
                return topology != null && topology.getLeaderForPartition(1) >= 0;
            });
        }
        this.dataDirectory = this.broker.getSystemContext().getBrokerConfiguration().getData().getDirectory();
    }

    public void configureBroker(BrokerCfg brokerCfg) {
        ExporterCfg exporterCfg = new ExporterCfg();
        exporterCfg.setClassName(RecordingExporter.class.getName());
        brokerCfg.getExporters().put(TEST_RECORD_EXPORTER_ID, exporterCfg);
        for (Consumer<BrokerCfg> configurator : this.configurators) {
            configurator.accept(brokerCfg);
        }
        EmbeddedBrokerRule.assignSocketAddresses(brokerCfg);
    }

    public void purgeSnapshots() {
        File directory = new File(this.dataDirectory);
        File[] partitionDirectories = directory.listFiles((d, f) -> new File(d, f).isDirectory());
        if (partitionDirectories == null) {
            return;
        }
        for (File partitionDirectory : partitionDirectories) {
            File stateDirectory = new File(partitionDirectory, STATE_DIRECTORY);
            if (!stateDirectory.exists()) continue;
            EmbeddedBrokerRule.deleteSnapshots(stateDirectory);
        }
    }

    private static class LeaderPartitionListener
    implements PartitionListener {
        private final CountDownLatch latch;

        LeaderPartitionListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
            return CompletableActorFuture.completed(null);
        }

        public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream, QueryService queryService) {
            this.latch.countDown();
            return CompletableActorFuture.completed(null);
        }

        public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
            return CompletableActorFuture.completed(null);
        }
    }
}

