/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.impl.stream;

import io.camunda.zeebe.gateway.RequestMapper;
import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJobImpl;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.agrona.DirectBuffer;

public class ClientStreamAdapter {
    private final ClientStreamer<JobActivationProperties> jobStreamer;
    private final Executor executor;

    public ClientStreamAdapter(ClientStreamer<JobActivationProperties> jobStreamer, Executor executor) {
        this.jobStreamer = jobStreamer;
        this.executor = executor;
    }

    public void handle(GatewayOuterClass.StreamActivatedJobsRequest request, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver) {
        if (request.getType().isBlank()) {
            this.handleError(responseObserver, "type", "present", "blank");
            return;
        }
        if (request.getTimeout() < 1L) {
            this.handleError(responseObserver, "timeout", "greater than zero", Long.toString(request.getTimeout()));
            return;
        }
        this.handleInternal(request, responseObserver);
    }

    private void handleInternal(GatewayOuterClass.StreamActivatedJobsRequest request, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver) {
        JobActivationProperties jobActivationProperties = RequestMapper.toJobActivationProperties(request);
        ActorFuture futureId = this.jobStreamer.add(BufferUtil.wrapString((String)request.getType()), (BufferWriter)jobActivationProperties, (ClientStreamConsumer)new ClientStreamConsumerImpl((StreamObserver<GatewayOuterClass.ActivatedJob>)responseObserver, this.executor));
        JobStreamRemover cleaner = new JobStreamRemover((ActorFuture<ClientStreamId>)futureId);
        responseObserver.setOnCloseHandler((Runnable)cleaner);
        responseObserver.setOnCancelHandler((Runnable)cleaner);
    }

    private void handleError(ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver, String field, String expectation, String actual) {
        String format = "Expected to stream activated jobs with %s to be %s, but it was %s";
        String errorMessage = "Expected to stream activated jobs with %s to be %s, but it was %s".formatted(field, expectation, actual);
        responseObserver.onError((Throwable)new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(errorMessage)));
    }

    static final class ClientStreamConsumerImpl
    implements ClientStreamConsumer {
        private final StreamObserver<GatewayOuterClass.ActivatedJob> responseObserver;
        private final Executor executor;

        public ClientStreamConsumerImpl(StreamObserver<GatewayOuterClass.ActivatedJob> responseObserver, Executor executor) {
            this.responseObserver = responseObserver;
            this.executor = executor;
        }

        public CompletableFuture<Void> push(DirectBuffer payload) {
            return CompletableFuture.runAsync(() -> this.handlePushedJob(payload), this.executor);
        }

        private void handlePushedJob(DirectBuffer payload) {
            ActivatedJobImpl deserializedJob = new ActivatedJobImpl();
            deserializedJob.wrap(payload);
            GatewayOuterClass.ActivatedJob activatedJob = ResponseMapper.toActivatedJob((ActivatedJob)deserializedJob);
            try {
                this.responseObserver.onNext((Object)activatedJob);
            }
            catch (Exception e) {
                this.responseObserver.onError((Throwable)e);
                throw e;
            }
        }
    }

    private final class JobStreamRemover
    implements Runnable {
        private final ActorFuture<ClientStreamId> clientStreamId;

        private JobStreamRemover(ActorFuture<ClientStreamId> clientStreamId) {
            this.clientStreamId = clientStreamId;
        }

        @Override
        public void run() {
            this.clientStreamId.onComplete(this::onJobStreamerId, ClientStreamAdapter.this.executor);
        }

        private void onJobStreamerId(ClientStreamId id, Throwable error) {
            if (error != null) {
                return;
            }
            ClientStreamAdapter.this.jobStreamer.remove(id);
        }
    }
}

