/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.stream.ExporterStateDistributeMessage;
import io.camunda.zeebe.broker.exporter.stream.ExporterStateDistributionService;
import io.camunda.zeebe.broker.exporter.stream.SimplePartitionMessageService;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.HashMap;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.Before;
import org.junit.Test;

public class ExporterStateDistributionTest {
    private ExporterStateDistributionService exporterStateDistributionService;
    private Map<String, ExporterStateDistributeMessage.ExporterStateEntry> exporterState;
    private SimplePartitionMessageService partitionMessagingService;

    @Before
    public void setup() {
        this.exporterState = new HashMap<String, ExporterStateDistributeMessage.ExporterStateEntry>();
        this.partitionMessagingService = new SimplePartitionMessageService();
        this.exporterStateDistributionService = new ExporterStateDistributionService(this.exporterState::put, (PartitionMessagingService)this.partitionMessagingService, "topic");
    }

    @Test
    public void shouldSubscribeForGivenTopic() {
        this.exporterStateDistributionService.subscribeForExporterState(Runnable::run);
        Assertions.assertThat(this.partitionMessagingService.consumers).containsKey((Object)"topic");
    }

    @Test
    public void shouldConsumeExporterMessage() {
        DirectBuffer metadata1 = BufferUtil.wrapString((String)"e1");
        DirectBuffer metadata2 = BufferUtil.wrapString((String)"e2");
        ExporterStateDistributeMessage exporterPositionsMessage = new ExporterStateDistributeMessage();
        exporterPositionsMessage.putExporter("elastic", 123L, metadata1);
        exporterPositionsMessage.putExporter("metric", 345L, metadata2);
        this.exporterStateDistributionService.subscribeForExporterState(Runnable::run);
        this.exporterStateDistributionService.distributeExporterState(exporterPositionsMessage);
        ((MapAssert)Assertions.assertThat(this.exporterState).containsEntry((Object)"elastic", (Object)new ExporterStateDistributeMessage.ExporterStateEntry(123L, metadata1))).containsEntry((Object)"metric", (Object)new ExporterStateDistributeMessage.ExporterStateEntry(345L, metadata2));
    }

    @Test
    public void shouldRemoveSubscriptionOnClose() throws Exception {
        ExporterStateDistributeMessage exporterPositionsMessage = new ExporterStateDistributeMessage();
        exporterPositionsMessage.putExporter("elastic", 123L, (DirectBuffer)new UnsafeBuffer());
        exporterPositionsMessage.putExporter("metric", 345L, (DirectBuffer)new UnsafeBuffer());
        this.exporterStateDistributionService.subscribeForExporterState(Runnable::run);
        this.exporterStateDistributionService.close();
        Assertions.assertThat(this.partitionMessagingService.consumers).isEmpty();
    }
}

