/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.process.test.engine;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.engine.EngineStateMonitor;
import io.camunda.zeebe.process.test.engine.GrpcToLogStreamGateway;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.grpc.Server;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryEngine
implements ZeebeTestEngine {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryEngine.class);
    private final Server grpcServer;
    private final StreamProcessor streamProcessor;
    private final GrpcToLogStreamGateway gateway;
    private final ZeebeDb<ZbColumnFamilies> database;
    private final LogStream logStream;
    private final ActorScheduler scheduler;
    private final RecordStreamSource recordStream;
    private final ControlledActorClock clock;
    private final EngineStateMonitor engineStateMonitor;

    public InMemoryEngine(Server grpcServer, StreamProcessor streamProcessor, GrpcToLogStreamGateway gateway, ZeebeDb<ZbColumnFamilies> database, LogStream logStream, ActorScheduler scheduler, RecordStreamSource recordStream, ControlledActorClock clock, EngineStateMonitor engineStateMonitor) {
        this.grpcServer = grpcServer;
        this.streamProcessor = streamProcessor;
        this.gateway = gateway;
        this.database = database;
        this.logStream = logStream;
        this.scheduler = scheduler;
        this.recordStream = recordStream;
        this.clock = clock;
        this.engineStateMonitor = engineStateMonitor;
    }

    @Override
    public void start() {
        try {
            this.grpcServer.start();
            this.streamProcessor.openAsync(false).join();
        }
        catch (IOException e) {
            LOG.error("Failed starting in memory engine", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop() {
        try {
            this.grpcServer.shutdownNow();
            this.grpcServer.awaitTermination();
            this.streamProcessor.close();
            this.database.close();
            this.logStream.close();
            this.scheduler.stop();
        }
        catch (Exception e) {
            LOG.error("Failed stopping in memory engine", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public RecordStreamSource getRecordStreamSource() {
        return this.recordStream;
    }

    @Override
    public ZeebeClient createClient() {
        return ZeebeClient.newClientBuilder().applyEnvironmentVariableOverrides(false).gatewayAddress(this.getGatewayAddress()).usePlaintext().build();
    }

    @Override
    public ZeebeClient createClient(ObjectMapper objectMapper) {
        return ZeebeClient.newClientBuilder().withJsonMapper(new ZeebeObjectMapper(objectMapper)).applyEnvironmentVariableOverrides(false).gatewayAddress(this.getGatewayAddress()).usePlaintext().build();
    }

    @Override
    public String getGatewayAddress() {
        return this.gateway.getAddress();
    }

    @Override
    public void increaseTime(Duration timeToAdd) {
        this.clock.addTime(timeToAdd);
    }

    @Override
    public void waitForIdleState(Duration timeout) throws InterruptedException, TimeoutException {
        CompletableFuture idleState = new CompletableFuture();
        this.engineStateMonitor.addOnIdleCallback(() -> idleState.complete(null));
        try {
            idleState.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    @Override
    public void waitForBusyState(Duration timeout) throws InterruptedException, TimeoutException {
        CompletableFuture processingState = new CompletableFuture();
        this.engineStateMonitor.addOnProcessingCallback(() -> processingState.complete(null));
        try {
            processingState.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }
}

