/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.broker.exporter.stream.ExporterPhase;
import io.camunda.zeebe.broker.exporter.stream.ExporterRule;
import io.camunda.zeebe.broker.exporter.stream.ExportersState;
import io.camunda.zeebe.broker.exporter.stream.SimplePartitionMessageService;
import io.camunda.zeebe.broker.exporter.util.ControlledTestExporter;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionEvaluationListener;
import org.awaitility.core.EvaluatedCondition;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

public final class ExporterDirectorDistributionTest {
    private static final String EXPORTER_ID_1 = "exporter-1";
    private static final String EXPORTER_ID_2 = "exporter-2";
    private static final DirectBuffer EXPORTER_METADATA_1 = BufferUtil.wrapString((String)"e1");
    private static final DirectBuffer EXPORTER_METADATA_2 = BufferUtil.wrapString((String)"e2");
    private static final UnsafeBuffer NO_METADATA = new UnsafeBuffer();
    private static final Duration DISTRIBUTION_INTERVAL = Duration.ofSeconds(15L);
    private final SimplePartitionMessageService simplePartitionMessageService = new SimplePartitionMessageService();
    @Rule
    public final ExporterRule activeExporters = ExporterRule.activeExporter().withPartitionMessageService(this.simplePartitionMessageService).withDistributionInterval(DISTRIBUTION_INTERVAL);
    @Rule
    public final ExporterRule passiveExporters = ExporterRule.passiveExporter().withPartitionMessageService(this.simplePartitionMessageService);
    private final List<ControlledTestExporter> exporters = new ArrayList<ControlledTestExporter>();
    private final List<ExporterDescriptor> exporterDescriptors = new ArrayList<ExporterDescriptor>();

    @Before
    public void init() {
        this.exporters.clear();
        this.exporterDescriptors.clear();
        this.createExporter(EXPORTER_ID_1, EXPORTER_METADATA_1);
        this.createExporter(EXPORTER_ID_2, EXPORTER_METADATA_2);
    }

    @After
    public void tearDown() throws Exception {
        this.activeExporters.closeExporterDirector();
        this.passiveExporters.closeExporterDirector();
    }

    private void createExporter(String exporterId, DirectBuffer exporterMetadata) {
        ControlledTestExporter exporter = (ControlledTestExporter)Mockito.spy((Object)new ControlledTestExporter());
        ExporterDescriptor descriptor = (ExporterDescriptor)Mockito.spy((Object)new ExporterDescriptor(exporterId, exporter.getClass(), Map.of()));
        ((ExporterDescriptor)Mockito.doAnswer(c -> exporter).when((Object)descriptor)).newInstance();
        byte[] exporterMetadataBytes = BufferUtil.bufferAsArray((DirectBuffer)exporterMetadata);
        exporter.onExport(record -> exporter.getController().updateLastExportedRecordPosition(record.getPosition(), exporterMetadataBytes));
        this.exporters.add(exporter);
        this.exporterDescriptors.add(descriptor);
    }

    private void startExporters(List<ExporterDescriptor> exporterDescriptors) {
        this.activeExporters.startExporterDirector(exporterDescriptors);
        this.passiveExporters.startExporterDirector(exporterDescriptors);
    }

    @Test
    public void shouldDistributeExporterState() {
        this.startExporters(this.exporterDescriptors);
        long position = this.activeExporters.writeEvent((Intent)DeploymentIntent.CREATED, (UnifiedRecordValue)new DeploymentRecord());
        ExportersState activeExporterState = this.activeExporters.getExportersState();
        Awaitility.await((String)"Director has read all records and update the positions.").untilAsserted(() -> {
            Assertions.assertThat((long)activeExporterState.getPosition(EXPORTER_ID_1)).isEqualTo(position);
            Assertions.assertThat((Comparable)activeExporterState.getExporterMetadata(EXPORTER_ID_1)).isEqualTo((Object)EXPORTER_METADATA_1);
            Assertions.assertThat((long)activeExporterState.getPosition(EXPORTER_ID_2)).isEqualTo(position);
            Assertions.assertThat((Comparable)activeExporterState.getExporterMetadata(EXPORTER_ID_2)).isEqualTo((Object)EXPORTER_METADATA_2);
        });
        ExportersState passiveExporterState = this.passiveExporters.getExportersState();
        Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_1)).isEqualTo(-1L);
        Assertions.assertThat((Comparable)passiveExporterState.getExporterMetadata(EXPORTER_ID_1)).isEqualTo((Object)NO_METADATA);
        Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_2)).isEqualTo(-1L);
        Assertions.assertThat((Comparable)passiveExporterState.getExporterMetadata(EXPORTER_ID_2)).isEqualTo((Object)NO_METADATA);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await((String)"Active Director has distributed positions and passive has received it").conditionEvaluationListener((ConditionEvaluationListener)new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> {
            Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_1)).isEqualTo(position);
            Assertions.assertThat((Comparable)passiveExporterState.getExporterMetadata(EXPORTER_ID_1)).isEqualTo((Object)EXPORTER_METADATA_1);
            Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_2)).isEqualTo(position);
            Assertions.assertThat((Comparable)passiveExporterState.getExporterMetadata(EXPORTER_ID_2)).isEqualTo((Object)EXPORTER_METADATA_2);
        });
    }

    @Test
    public void shouldNotResetExporterPositionWhenOldPositionReceived() {
        this.startExporters(this.exporterDescriptors);
        Awaitility.await((String)"Exporter has recovered and started exporting.").untilAsserted(() -> Assertions.assertThat((Comparable)((ExporterPhase)this.activeExporters.getDirector().getPhase().join())).isEqualTo((Object)ExporterPhase.EXPORTING));
        long position = 10L;
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 10L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 10L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        ExportersState passiveExporterState = this.passiveExporters.getExportersState();
        Awaitility.await((String)"Active Director has distributed positions and passive has received it").conditionEvaluationListener((ConditionEvaluationListener)new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> {
            Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
            Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_2)).isEqualTo(10L);
        });
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_1, 9L);
        this.activeExporters.getExportersState().setPosition(EXPORTER_ID_2, 11L);
        this.activeExporters.getClock().addTime(DISTRIBUTION_INTERVAL);
        Awaitility.await((String)"Active Director has distributed positions and passive has received it").conditionEvaluationListener((ConditionEvaluationListener)new ClockShifter(this.activeExporters.getClock())).untilAsserted(() -> Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_2)).isEqualTo(11L));
        Assertions.assertThat((long)passiveExporterState.getPosition(EXPORTER_ID_1)).isEqualTo(10L);
    }

    private record ClockShifter(ControlledActorClock clock) implements ConditionEvaluationListener<Void>
    {
        public void conditionEvaluated(EvaluatedCondition<Void> condition) {
            this.clock.addTime(DISTRIBUTION_INTERVAL);
        }
    }
}

