/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.broker.test.TestActorSchedulerFactory;
import io.camunda.zeebe.broker.test.TestBrokerClientFactory;
import io.camunda.zeebe.broker.test.TestClusterFactory;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

final class PartitionLeaveTest {
    private static final MeterRegistry METER_REGISTRY = new SimpleMeterRegistry();

    PartitionLeaveTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void canStillProcessAfterLeaving(@TempDir Path tmp) {
        Broker broker0 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-0"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(0);
            clusterCfg.setPartitionsCount(1);
            clusterCfg.setReplicationFactor(2);
        });
        InetSocketAddress initialContactPoint = broker0.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker broker1 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-1"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setInitialContactPoints(List.of(initialContactPoint.getHostName() + ":" + initialContactPoint.getPort()));
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(1);
            clusterCfg.setPartitionsCount(1);
            clusterCfg.setReplicationFactor(2);
        });
        CompletableFuture[] completableFutureArray = new CompletableFuture[2];
        completableFutureArray[0] = CompletableFuture.runAsync(() -> ((Broker)broker0).start());
        completableFutureArray[1] = CompletableFuture.runAsync(() -> ((Broker)broker1).start());
        CompletableFuture.allOf(completableFutureArray).join();
        try (ZeebeClient client = ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + broker0.getConfig().getGateway().getNetwork().getPort()).build();){
            Awaitility.await().untilAsserted(() -> TopologyAssert.assertThat((Topology)((Topology)client.newTopologyRequest().send().join())).isComplete(2, 1, 2));
            ((PartitionManagerImpl)broker1.getBrokerContext().getPartitionManager()).leave(1).join();
            client.newPublishMessageCommand().messageName("msg").correlationKey("key").send().join();
        }
        finally {
            broker0.close();
            broker1.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void shouldRemoveDataAfterLeaving(@TempDir Path tmp) {
        Broker broker0 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-0"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(0);
            clusterCfg.setPartitionsCount(2);
            clusterCfg.setReplicationFactor(2);
        });
        InetSocketAddress initialContactPoint = broker0.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker broker1 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-1"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setInitialContactPoints(List.of(initialContactPoint.getHostName() + ":" + initialContactPoint.getPort()));
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(1);
            clusterCfg.setPartitionsCount(2);
            clusterCfg.setReplicationFactor(2);
        });
        CompletableFuture[] completableFutureArray = new CompletableFuture[2];
        completableFutureArray[0] = CompletableFuture.runAsync(() -> ((Broker)broker0).start());
        completableFutureArray[1] = CompletableFuture.runAsync(() -> ((Broker)broker1).start());
        CompletableFuture.allOf(completableFutureArray).join();
        try (ZeebeClient client = ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + broker0.getConfig().getGateway().getNetwork().getPort()).build();){
            Awaitility.await().untilAsserted(() -> TopologyAssert.assertThat((Topology)((Topology)client.newTopologyRequest().send().join())).isComplete(2, 2, 2));
            ((PartitionManagerImpl)broker1.getBrokerContext().getPartitionManager()).leave(1).join();
            Assertions.assertThat((Path)tmp.resolve("broker-1/data/raft-partition/partitions/1")).doesNotExist();
            Assertions.assertThat((Path)tmp.resolve("broker-1/data/raft-partition/partitions/2")).isNotEmptyDirectory();
            Assertions.assertThat((Path)tmp.resolve("broker-0/data/raft-partition/partitions/1")).isNotEmptyDirectory();
            Assertions.assertThat((Path)tmp.resolve("broker-0/data/raft-partition/partitions/2")).isNotEmptyDirectory();
        }
        finally {
            broker0.close();
            broker1.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void shouldNotRemoveDataIfLeavingFails(@TempDir Path tmp) {
        Broker broker0 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-0"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(0);
            clusterCfg.setPartitionsCount(2);
            clusterCfg.setReplicationFactor(2);
        });
        InetSocketAddress initialContactPoint = broker0.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();
        Broker broker1 = PartitionLeaveTest.buildBroker(tmp.resolve("broker-1"), brokerCfg -> {
            ClusterCfg clusterCfg = brokerCfg.getCluster();
            clusterCfg.setInitialContactPoints(List.of(initialContactPoint.getHostName() + ":" + initialContactPoint.getPort()));
            clusterCfg.setClusterSize(2);
            clusterCfg.setNodeId(1);
            clusterCfg.setPartitionsCount(2);
            clusterCfg.setReplicationFactor(2);
        });
        CompletableFuture[] completableFutureArray = new CompletableFuture[2];
        completableFutureArray[0] = CompletableFuture.runAsync(() -> ((Broker)broker0).start());
        completableFutureArray[1] = CompletableFuture.runAsync(() -> ((Broker)broker1).start());
        CompletableFuture.allOf(completableFutureArray).join();
        try (ZeebeClient client = ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress("localhost:" + broker0.getConfig().getGateway().getNetwork().getPort()).build();){
            Awaitility.await().untilAsserted(() -> TopologyAssert.assertThat((Topology)((Topology)client.newTopologyRequest().send().join())).isComplete(2, 2, 2));
            broker0.close();
            Assertions.assertThatThrownBy(() -> ((PartitionManagerImpl)broker1.getBrokerContext().getPartitionManager()).leave(1).join());
            Assertions.assertThat((Path)tmp.resolve("broker-1/data/raft-partition/partitions/1")).isNotEmptyDirectory();
            Assertions.assertThat((Path)tmp.resolve("broker-1/data/raft-partition/partitions/2")).isNotEmptyDirectory();
            Assertions.assertThat((Path)tmp.resolve("broker-0/data/raft-partition/partitions/1")).isNotEmptyDirectory();
            Assertions.assertThat((Path)tmp.resolve("broker-0/data/raft-partition/partitions/2")).isNotEmptyDirectory();
        }
        finally {
            broker0.close();
            broker1.close();
        }
    }

    private static Broker buildBroker(Path tmp, Consumer<BrokerCfg> configure) {
        BrokerCfg brokerCfg = new BrokerCfg();
        EmbeddedBrokerRule.assignSocketAddresses(brokerCfg);
        brokerCfg.init(tmp.toAbsolutePath().toString());
        configure.accept(brokerCfg);
        ActorScheduler actorScheduler = TestActorSchedulerFactory.ofBrokerConfig(brokerCfg);
        AtomixCluster atomixCluster = TestClusterFactory.createAtomixCluster(brokerCfg, METER_REGISTRY);
        BrokerClient brokerClient = TestBrokerClientFactory.createBrokerClient(atomixCluster, actorScheduler);
        SystemContext systemContext = new SystemContext(brokerCfg, actorScheduler, atomixCluster, brokerClient);
        return new Broker(systemContext, new SpringBrokerBridge(), List.of());
    }
}

