/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.backup.processing.CheckpointRecordsProcessor;
import io.camunda.zeebe.broker.engine.impl.BoundedScheduledCommandCache;
import io.camunda.zeebe.broker.engine.impl.ScheduledCommandCacheMetrics;
import io.camunda.zeebe.broker.system.configuration.ExperimentalCfg;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.stream.api.EventFilter;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.scheduling.ScheduledCommandCache;
import io.camunda.zeebe.stream.impl.SkipPositionsFilter;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import io.camunda.zeebe.stream.impl.StreamProcessorMode;
import io.camunda.zeebe.util.health.HealthMonitorable;
import java.util.List;
import java.util.function.BiFunction;

public final class StreamProcessorTransitionStep
implements PartitionTransitionStep {
    private final BiFunction<PartitionTransitionContext, RaftServer.Role, StreamProcessor> streamProcessorCreator;

    public StreamProcessorTransitionStep() {
        this(StreamProcessorTransitionStep::createStreamProcessor);
    }

    public StreamProcessorTransitionStep(BiFunction<PartitionTransitionContext, RaftServer.Role, StreamProcessor> streamProcessorCreator) {
        this.streamProcessorCreator = streamProcessorCreator;
    }

    @Override
    public void onNewRaftRole(PartitionTransitionContext context, RaftServer.Role newRole) {
        RaftServer.Role currentRole = context.getCurrentRole();
        StreamProcessor streamprocessor = context.getStreamProcessor();
        if (streamprocessor == null) {
            return;
        }
        if (this.shouldInstallOnTransition(newRole, currentRole) || newRole == RaftServer.Role.INACTIVE) {
            streamprocessor.pauseProcessing();
        }
    }

    @Override
    public ActorFuture<Void> prepareTransition(PartitionTransitionContext context, long term, RaftServer.Role targetRole) {
        ConcurrencyControl concurrencyControl = context.getConcurrencyControl();
        RaftServer.Role currentRole = context.getCurrentRole();
        StreamProcessor streamprocessor = context.getStreamProcessor();
        if (streamprocessor != null && (this.shouldInstallOnTransition(targetRole, currentRole) || targetRole == RaftServer.Role.INACTIVE)) {
            context.getComponentHealthMonitor().removeComponent(streamprocessor.getName());
            ActorFuture future = streamprocessor.closeAsync();
            future.onComplete((success, error) -> {
                if (error == null) {
                    context.setStreamProcessor(null);
                }
            });
            return future;
        }
        return concurrencyControl.createCompletedFuture();
    }

    @Override
    public ActorFuture<Void> transitionTo(PartitionTransitionContext context, long term, RaftServer.Role targetRole) {
        RaftServer.Role currentRole = context.getCurrentRole();
        ConcurrencyControl concurrencyControl = context.getConcurrencyControl();
        if (this.shouldInstallOnTransition(targetRole, currentRole) || context.getStreamProcessor() == null && targetRole != RaftServer.Role.INACTIVE) {
            StreamProcessor streamProcessor = this.streamProcessorCreator.apply(context, targetRole);
            context.setStreamProcessor(streamProcessor);
            ActorFuture openFuture = streamProcessor.openAsync(!context.shouldProcess());
            ActorFuture future = concurrencyControl.createFuture();
            openFuture.onComplete((nothing, err) -> {
                if (err == null) {
                    if (!context.shouldProcess()) {
                        streamProcessor.pauseProcessing();
                    } else {
                        streamProcessor.resumeProcessing();
                    }
                    context.getComponentHealthMonitor().registerComponent(streamProcessor.getName(), (HealthMonitorable)streamProcessor);
                    future.complete(null);
                } else {
                    future.completeExceptionally(err);
                }
            });
            return future;
        }
        return concurrencyControl.createCompletedFuture();
    }

    @Override
    public String getName() {
        return "StreamProcessor";
    }

    private boolean shouldInstallOnTransition(RaftServer.Role newRole, RaftServer.Role currentRole) {
        return newRole == RaftServer.Role.LEADER || newRole == RaftServer.Role.FOLLOWER && currentRole != RaftServer.Role.CANDIDATE || newRole == RaftServer.Role.CANDIDATE && currentRole != RaftServer.Role.FOLLOWER;
    }

    private static StreamProcessor createStreamProcessor(final PartitionTransitionContext context, RaftServer.Role targetRole) {
        StreamProcessorMode streamProcessorMode = targetRole == RaftServer.Role.LEADER ? StreamProcessorMode.PROCESSING : StreamProcessorMode.REPLAY;
        ExperimentalCfg experimentalCfg = context.getBrokerCfg().getExperimental();
        EngineConfiguration engineCfg = experimentalCfg.getEngine().createEngineConfiguration();
        Engine engine = new Engine(context.getTypedRecordProcessorFactory(), engineCfg);
        List<CheckpointRecordsProcessor> recordProcessors = List.of(engine, context.getCheckpointProcessor());
        BoundedScheduledCommandCache scheduledCommandCache = BoundedScheduledCommandCache.ofIntent(new ScheduledCommandCacheMetrics.BoundedCommandCacheMetrics(context.getPartitionTransitionMeterRegistry()), new Intent[]{TimerIntent.TRIGGER, JobIntent.TIME_OUT, JobIntent.RECUR_AFTER_BACKOFF, MessageIntent.EXPIRE});
        SkipPositionsFilter processingFilter = SkipPositionsFilter.of(context.getBrokerCfg().getProcessing().skipPositions());
        return StreamProcessor.builder().meterRegistry(context.getPartitionTransitionMeterRegistry()).logStream(context.getLogStream()).actorSchedulingService(context.getActorSchedulingService()).zeebeDb(context.getZeebeDb()).recordProcessors(recordProcessors).nodeId(context.getNodeId()).commandResponseWriter(context.getCommandResponseWriter()).maxCommandsInBatch(context.getBrokerCfg().getProcessing().getMaxCommandsInBatch()).setEnableAsyncScheduledTasks(context.getBrokerCfg().getProcessing().isEnableAsyncScheduledTasks()).processingFilter((EventFilter)processingFilter).listener(new StreamProcessorListener(){

            public void onProcessed(TypedRecord<?> processedCommand) {
                context.getOnProcessedListener().accept(processedCommand);
            }

            public void onSkipped(LoggedEvent skippedRecord) {
            }
        }).streamProcessorMode(streamProcessorMode).partitionCommandSender((InterPartitionCommandSender)context.getPartitionCommandSender()).scheduledCommandCache((ScheduledCommandCache.StageableScheduledCommandCache)scheduledCommandCache).build();
    }
}

