/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.bootstrap;

import io.atomix.utils.event.EventListener;
import io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupContext;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.jobstream.JobStreamMetrics;
import io.camunda.zeebe.broker.jobstream.JobStreamService;
import io.camunda.zeebe.broker.jobstream.RemoteJobStreamErrorHandlerService;
import io.camunda.zeebe.broker.jobstream.RemoteJobStreamer;
import io.camunda.zeebe.broker.jobstream.YieldingJobStreamErrorHandler;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.TransportFactory;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.RemoteStreamService;
import io.camunda.zeebe.transport.stream.api.RemoteStreamer;
import java.util.Collection;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

public final class JobStreamServiceStep
extends AbstractBrokerStartupStep {
    @Override
    void startupInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> startupFuture) {
        ClusterServicesImpl clusterServices = brokerStartupContext.getClusterServices();
        RemoteJobStreamErrorHandlerService errorHandlerService = new RemoteJobStreamErrorHandlerService(new YieldingJobStreamErrorHandler());
        ActorSchedulingService scheduler = brokerStartupContext.getActorSchedulingService();
        RemoteStreamService remoteStreamService = new TransportFactory(scheduler).createRemoteStreamServer(clusterServices.getCommunicationService(), JobStreamServiceStep::readJobActivationProperties, (RemoteStreamErrorHandler)errorHandlerService, (RemoteStreamMetrics)new JobStreamMetrics(brokerStartupContext.getMeterRegistry()));
        ActorFuture errorHandlerStarted = scheduler.submitActor((Actor)errorHandlerService);
        errorHandlerStarted.onComplete((ok, err) -> {
            if (err != null) {
                startupFuture.completeExceptionally(err);
                return;
            }
            remoteStreamService.start(scheduler, concurrencyControl).onComplete((streamer, error) -> {
                if (error != null) {
                    startupFuture.completeExceptionally(error);
                    return;
                }
                JobStreamService jobStreamService = new JobStreamService((RemoteStreamService<JobActivationProperties, ActivatedJob>)remoteStreamService, new RemoteJobStreamer((RemoteStreamer<JobActivationProperties, ActivatedJob>)streamer, clusterServices.getEventService()), errorHandlerService);
                clusterServices.getMembershipService().addListener((EventListener)remoteStreamService);
                brokerStartupContext.addPartitionListener(errorHandlerService);
                brokerStartupContext.setJobStreamService(jobStreamService);
                brokerStartupContext.getSpringBrokerBridge().registerJobStreamServiceSupplier(() -> jobStreamService);
                startupFuture.complete((Object)brokerStartupContext);
            });
        });
    }

    @Override
    void shutdownInternal(BrokerStartupContext brokerShutdownContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> shutdownFuture) {
        JobStreamService service = brokerShutdownContext.getJobStreamService();
        if (service != null) {
            brokerShutdownContext.getClusterServices().getMembershipService().removeListener(service.remoteStreamService());
            brokerShutdownContext.removePartitionListener(service.errorHandlerService());
            service.closeAsync(concurrencyControl).onComplete((ok, error) -> {
                if (error != null) {
                    shutdownFuture.completeExceptionally(error);
                } else {
                    brokerShutdownContext.getClusterServices().getMembershipService().removeListener(service.remoteStreamService());
                    brokerShutdownContext.setJobStreamService(null);
                    brokerShutdownContext.getSpringBrokerBridge().registerJobStreamServiceSupplier(null);
                    shutdownFuture.complete((Object)brokerShutdownContext);
                }
            });
        }
    }

    static JobActivationProperties readJobActivationProperties(DirectBuffer buffer) {
        JobActivationPropertiesImpl mutable = new JobActivationPropertiesImpl();
        mutable.wrap(buffer);
        return new ImmutableJobActivationPropertiesImpl(mutable.worker(), mutable.timeout(), mutable.fetchVariables(), mutable.tenantIds());
    }

    public String getName() {
        return "JobStreamService";
    }

    private record ImmutableJobActivationPropertiesImpl(DirectBuffer worker, long timeout, Collection<DirectBuffer> fetchVariables, Collection<String> tenantIds) implements JobActivationProperties
    {
        public int getLength() {
            throw new UnsupportedOperationException();
        }

        public void write(MutableDirectBuffer buffer, int offset) {
            throw new UnsupportedOperationException();
        }
    }
}

