/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.impl.broker;

import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.BrokerErrorException;
import io.camunda.zeebe.broker.client.api.BrokerResponseConsumer;
import io.camunda.zeebe.broker.client.api.BrokerTopologyManager;
import io.camunda.zeebe.broker.client.api.NoTopologyAvailableException;
import io.camunda.zeebe.broker.client.api.RequestDispatchStrategy;
import io.camunda.zeebe.broker.client.api.RequestRetriesExhaustedException;
import io.camunda.zeebe.broker.client.api.dto.BrokerRequest;
import io.camunda.zeebe.broker.client.api.dto.BrokerResponse;
import io.camunda.zeebe.broker.client.impl.PartitionIdIterator;
import io.camunda.zeebe.protocol.record.ErrorCode;
import java.net.ConnectException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RequestRetryHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(RequestRetryHandler.class);
    private final RequestDispatchStrategy roundRobinDispatchStrategy = RequestDispatchStrategy.roundRobin();
    private final BrokerClient brokerClient;
    private final BrokerTopologyManager topologyManager;

    public RequestRetryHandler(BrokerClient brokerClient, BrokerTopologyManager topologyManager) {
        this.brokerClient = brokerClient;
        this.topologyManager = topologyManager;
    }

    public <BrokerResponseT> void sendRequest(BrokerRequest<BrokerResponseT> request, BrokerResponseConsumer<BrokerResponseT> responseConsumer, Consumer<Throwable> throwableConsumer) {
        Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> requestSender = arg_0 -> ((BrokerClient)this.brokerClient).sendRequest(arg_0);
        this.sendRequestInternal(request, requestSender, responseConsumer, throwableConsumer);
    }

    public <BrokerResponseT> void sendRequest(BrokerRequest<BrokerResponseT> request, BrokerResponseConsumer<BrokerResponseT> responseConsumer, Consumer<Throwable> throwableConsumer, Duration requestTimeout) {
        Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> requestSender = r -> this.brokerClient.sendRequest(r, requestTimeout);
        this.sendRequestInternal(request, requestSender, responseConsumer, throwableConsumer);
    }

    private <BrokerResponseT> void sendRequestInternal(BrokerRequest<BrokerResponseT> request, Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> requestSender, BrokerResponseConsumer<BrokerResponseT> responseConsumer, Consumer<Throwable> throwableConsumer) {
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null || topology.getPartitionsCount() == 0) {
            throwableConsumer.accept((Throwable)new NoTopologyAvailableException());
            return;
        }
        this.sendRequestWithRetry(request, requestSender, this.partitionIdIteratorForType(topology.getPartitionsCount()), responseConsumer, throwableConsumer, new ArrayList<Throwable>());
    }

    private <BrokerResponseT> void sendRequestWithRetry(BrokerRequest<BrokerResponseT> request, Function<BrokerRequest<BrokerResponseT>, CompletableFuture<BrokerResponse<BrokerResponseT>>> requestSender, PartitionIdIterator partitionIdIterator, BrokerResponseConsumer<BrokerResponseT> responseConsumer, Consumer<Throwable> throwableConsumer, Collection<Throwable> errors) {
        if (partitionIdIterator.hasNext()) {
            int partitionId = partitionIdIterator.next();
            request.setPartitionId(partitionId);
            requestSender.apply(request).whenComplete((response, error) -> {
                if (error == null) {
                    responseConsumer.accept(response.getKey(), response.getResponse());
                } else if (this.shouldRetryWithNextPartition((Throwable)error)) {
                    LOGGER.trace("Failed to create process on partition {}", (Object)partitionIdIterator.getCurrentPartitionId(), error);
                    errors.add((Throwable)error);
                    this.sendRequestWithRetry(request, requestSender, partitionIdIterator, responseConsumer, throwableConsumer, errors);
                } else {
                    throwableConsumer.accept((Throwable)error);
                }
            });
        } else {
            RequestRetriesExhaustedException exception = new RequestRetriesExhaustedException();
            errors.forEach(arg_0 -> exception.addSuppressed(arg_0));
            throwableConsumer.accept((Throwable)exception);
        }
    }

    private boolean shouldRetryWithNextPartition(Throwable error) {
        if (error instanceof ConnectException) {
            return true;
        }
        if (error instanceof BrokerErrorException) {
            ErrorCode code = ((BrokerErrorException)error).getError().getCode();
            return code == ErrorCode.PARTITION_LEADER_MISMATCH || code == ErrorCode.RESOURCE_EXHAUSTED;
        }
        return false;
    }

    private PartitionIdIterator partitionIdIteratorForType(int partitionsCount) {
        int nextPartitionId = this.roundRobinDispatchStrategy.determinePartition(this.topologyManager);
        return new PartitionIdIterator(nextPartitionId, partitionsCount, this.topologyManager);
    }
}

