/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway;

import io.atomix.utils.net.Address;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.RequestMapper;
import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.RequestRetryHandler;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.stream.ClientStreamAdapter;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.record.PartitionHealthStatus;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.util.VersionUtil;
import io.grpc.stub.ServerCallStreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;

public final class EndpointManager {
    private final BrokerClient brokerClient;
    private final BrokerTopologyManager topologyManager;
    private final ActivateJobsHandler activateJobsHandler;
    private final RequestRetryHandler requestRetryHandler;
    private final ClientStreamAdapter clientStreamAdapter;

    public EndpointManager(BrokerClient brokerClient, ActivateJobsHandler activateJobsHandler, ClientStreamer<JobActivationProperties> jobStreamer, Executor executor) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = activateJobsHandler;
        this.clientStreamAdapter = new ClientStreamAdapter(jobStreamer, executor);
        this.topologyManager = brokerClient.getTopologyManager();
        this.requestRetryHandler = new RequestRetryHandler(brokerClient, this.topologyManager);
    }

    private void addBrokerInfo(GatewayOuterClass.BrokerInfo.Builder brokerInfo, Integer brokerId, BrokerClusterState topology) {
        String brokerAddress = topology.getBrokerAddress(brokerId);
        Address address = Address.from((String)brokerAddress);
        brokerInfo.setNodeId(brokerId.intValue()).setHost(address.host()).setPort(address.port()).setVersion(topology.getBrokerVersion(brokerId));
    }

    private void addPartitionInfoToBrokerInfo(GatewayOuterClass.BrokerInfo.Builder brokerInfo, Integer brokerId, BrokerClusterState topology) {
        topology.getPartitions().forEach(partitionId -> {
            GatewayOuterClass.Partition.Builder partitionBuilder = GatewayOuterClass.Partition.newBuilder().setPartitionId(partitionId.intValue());
            if (!this.setRole(brokerId, (Integer)partitionId, topology, partitionBuilder)) {
                return;
            }
            PartitionHealthStatus status = topology.getPartitionHealth(brokerId, (int)partitionId);
            switch (status) {
                case HEALTHY: {
                    partitionBuilder.setHealth(GatewayOuterClass.Partition.PartitionBrokerHealth.HEALTHY);
                    break;
                }
                case UNHEALTHY: {
                    partitionBuilder.setHealth(GatewayOuterClass.Partition.PartitionBrokerHealth.UNHEALTHY);
                    break;
                }
                case DEAD: {
                    partitionBuilder.setHealth(GatewayOuterClass.Partition.PartitionBrokerHealth.DEAD);
                    break;
                }
                default: {
                    Loggers.GATEWAY_LOGGER.debug("Unsupported partition broker health status '{}'", (Object)status.name());
                }
            }
            brokerInfo.addPartitions(partitionBuilder);
        });
    }

    private boolean setRole(Integer brokerId, Integer partitionId, BrokerClusterState topology, GatewayOuterClass.Partition.Builder partitionBuilder) {
        int partitionLeader = topology.getLeaderForPartition(partitionId);
        Set<Integer> partitionFollowers = topology.getFollowersForPartition(partitionId);
        Set<Integer> partitionInactives = topology.getInactiveNodesForPartition(partitionId);
        if (partitionLeader == brokerId) {
            partitionBuilder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.LEADER);
        } else if (partitionFollowers != null && partitionFollowers.contains(brokerId)) {
            partitionBuilder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.FOLLOWER);
        } else if (partitionInactives != null && partitionInactives.contains(brokerId)) {
            partitionBuilder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.INACTIVE);
        } else {
            return false;
        }
        return true;
    }

    public void streamActivatedJobs(GatewayOuterClass.StreamActivatedJobsRequest request, ServerCallStreamObserver<GatewayOuterClass.ActivatedJob> responseObserver) {
        this.clientStreamAdapter.handle(request, responseObserver);
    }

    public void activateJobs(GatewayOuterClass.ActivateJobsRequest request, ServerStreamObserver<GatewayOuterClass.ActivateJobsResponse> responseObserver) {
        this.activateJobsHandler.activateJobs(request, responseObserver);
    }

    public void cancelProcessInstance(GatewayOuterClass.CancelProcessInstanceRequest request, ServerStreamObserver<GatewayOuterClass.CancelProcessInstanceResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toCancelProcessInstanceRequest, ResponseMapper::toCancelProcessInstanceResponse, responseObserver);
    }

    public void completeJob(GatewayOuterClass.CompleteJobRequest request, ServerStreamObserver<GatewayOuterClass.CompleteJobResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toCompleteJobRequest, ResponseMapper::toCompleteJobResponse, responseObserver);
    }

    public void createProcessInstance(GatewayOuterClass.CreateProcessInstanceRequest request, ServerStreamObserver<GatewayOuterClass.CreateProcessInstanceResponse> responseObserver) {
        this.sendRequestWithRetryPartitions(request, RequestMapper::toCreateProcessInstanceRequest, ResponseMapper::toCreateProcessInstanceResponse, responseObserver);
    }

    public void createProcessInstanceWithResult(GatewayOuterClass.CreateProcessInstanceWithResultRequest request, ServerStreamObserver<GatewayOuterClass.CreateProcessInstanceWithResultResponse> responseObserver) {
        if (request.getRequestTimeout() > 0L) {
            this.sendRequestWithRetryPartitions(request, RequestMapper::toCreateProcessInstanceWithResultRequest, ResponseMapper::toCreateProcessInstanceWithResultResponse, responseObserver, Duration.ofMillis(request.getRequestTimeout()));
        } else {
            this.sendRequestWithRetryPartitions(request, RequestMapper::toCreateProcessInstanceWithResultRequest, ResponseMapper::toCreateProcessInstanceWithResultResponse, responseObserver);
        }
    }

    public void evaluateDecision(GatewayOuterClass.EvaluateDecisionRequest request, ServerStreamObserver<GatewayOuterClass.EvaluateDecisionResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toEvaluateDecisionRequest, ResponseMapper::toEvaluateDecisionResponse, responseObserver);
    }

    public void deployProcess(GatewayOuterClass.DeployProcessRequest request, ServerStreamObserver<GatewayOuterClass.DeployProcessResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toDeployProcessRequest, ResponseMapper::toDeployProcessResponse, responseObserver);
    }

    public void deployResource(GatewayOuterClass.DeployResourceRequest request, ServerStreamObserver<GatewayOuterClass.DeployResourceResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toDeployResourceRequest, ResponseMapper::toDeployResourceResponse, responseObserver);
    }

    public void failJob(GatewayOuterClass.FailJobRequest request, ServerStreamObserver<GatewayOuterClass.FailJobResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toFailJobRequest, ResponseMapper::toFailJobResponse, responseObserver);
    }

    public void throwError(GatewayOuterClass.ThrowErrorRequest request, ServerStreamObserver<GatewayOuterClass.ThrowErrorResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toThrowErrorRequest, ResponseMapper::toThrowErrorResponse, responseObserver);
    }

    public void publishMessage(GatewayOuterClass.PublishMessageRequest request, ServerStreamObserver<GatewayOuterClass.PublishMessageResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toPublishMessageRequest, ResponseMapper::toPublishMessageResponse, responseObserver);
    }

    public void resolveIncident(GatewayOuterClass.ResolveIncidentRequest request, ServerStreamObserver<GatewayOuterClass.ResolveIncidentResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toResolveIncidentRequest, ResponseMapper::toResolveIncidentResponse, responseObserver);
    }

    public void setVariables(GatewayOuterClass.SetVariablesRequest request, ServerStreamObserver<GatewayOuterClass.SetVariablesResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toSetVariablesRequest, ResponseMapper::toSetVariablesResponse, responseObserver);
    }

    public void topology(ServerStreamObserver<GatewayOuterClass.TopologyResponse> responseObserver) {
        GatewayOuterClass.TopologyResponse.Builder topologyResponseBuilder = GatewayOuterClass.TopologyResponse.newBuilder();
        BrokerClusterState topology = this.topologyManager.getTopology();
        String gatewayVersion = VersionUtil.getVersion();
        if (gatewayVersion != null && !gatewayVersion.isBlank()) {
            topologyResponseBuilder.setGatewayVersion(gatewayVersion);
        }
        ArrayList brokers = new ArrayList();
        if (topology != null) {
            topologyResponseBuilder.setClusterSize(topology.getClusterSize()).setPartitionsCount(topology.getPartitionsCount()).setReplicationFactor(topology.getReplicationFactor());
            topology.getBrokers().forEach(brokerId -> {
                GatewayOuterClass.BrokerInfo.Builder brokerInfo = GatewayOuterClass.BrokerInfo.newBuilder();
                this.addBrokerInfo(brokerInfo, (Integer)brokerId, topology);
                this.addPartitionInfoToBrokerInfo(brokerInfo, (Integer)brokerId, topology);
                brokers.add(brokerInfo.build());
            });
        }
        topologyResponseBuilder.addAllBrokers(brokers);
        GatewayOuterClass.TopologyResponse response = topologyResponseBuilder.build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    public void updateJobRetries(GatewayOuterClass.UpdateJobRetriesRequest request, ServerStreamObserver<GatewayOuterClass.UpdateJobRetriesResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toUpdateJobRetriesRequest, ResponseMapper::toUpdateJobRetriesResponse, responseObserver);
    }

    public void modifyProcessInstance(GatewayOuterClass.ModifyProcessInstanceRequest request, ServerStreamObserver<GatewayOuterClass.ModifyProcessInstanceResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toModifyProcessInstanceRequest, ResponseMapper::toModifyProcessInstanceResponse, responseObserver);
    }

    public void deleteResource(GatewayOuterClass.DeleteResourceRequest request, ServerStreamObserver<GatewayOuterClass.DeleteResourceResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toDeleteResourceRequest, ResponseMapper::toDeleteResourceResponse, responseObserver);
    }

    public void broadcastSignal(GatewayOuterClass.BroadcastSignalRequest request, ServerStreamObserver<GatewayOuterClass.BroadcastSignalResponse> responseObserver) {
        this.sendRequest(request, RequestMapper::toBroadcastSignalRequest, ResponseMapper::toBroadcastSignalResponse, responseObserver);
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(GrpcRequestT grpcRequest, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, ServerStreamObserver<GrpcResponseT> streamObserver) {
        BrokerRequest<BrokerResponseT> brokerRequest;
        try {
            brokerRequest = requestMapper.apply(grpcRequest);
        }
        catch (Exception e) {
            streamObserver.onError(e);
            return;
        }
        this.brokerClient.sendRequestWithRetry(brokerRequest, (key, response) -> this.consumeResponse(responseMapper, streamObserver, key, response), arg_0 -> streamObserver.onError(arg_0));
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequestWithRetryPartitions(GrpcRequestT grpcRequest, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, ServerStreamObserver<GrpcResponseT> streamObserver) {
        BrokerRequest<BrokerResponseT> brokerRequest;
        try {
            brokerRequest = requestMapper.apply(grpcRequest);
        }
        catch (Exception e) {
            streamObserver.onError(e);
            return;
        }
        this.requestRetryHandler.sendRequest(brokerRequest, (key, response) -> this.consumeResponse(responseMapper, streamObserver, key, response), arg_0 -> streamObserver.onError(arg_0));
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequestWithRetryPartitions(GrpcRequestT grpcRequest, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, ServerStreamObserver<GrpcResponseT> streamObserver, Duration timeout) {
        BrokerRequest<BrokerResponseT> brokerRequest;
        try {
            brokerRequest = requestMapper.apply(grpcRequest);
        }
        catch (Exception e) {
            streamObserver.onError(e);
            return;
        }
        this.requestRetryHandler.sendRequest(brokerRequest, (long key, BrokerResponseT response) -> this.consumeResponse(responseMapper, streamObserver, key, response), arg_0 -> streamObserver.onError(arg_0), timeout);
    }

    private <BrokerResponseT, GrpcResponseT> void consumeResponse(ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, ServerStreamObserver<GrpcResponseT> streamObserver, long key, BrokerResponseT response) {
        GrpcResponseT grpcResponse = responseMapper.apply(key, response);
        streamObserver.onNext(grpcResponse);
        streamObserver.onCompleted();
    }
}

