/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.admin.backup;

import io.camunda.zeebe.gateway.admin.IncompleteTopologyException;
import io.camunda.zeebe.gateway.admin.backup.BackupAlreadyExistException;
import io.camunda.zeebe.gateway.admin.backup.BackupApi;
import io.camunda.zeebe.gateway.admin.backup.BackupDeleteRequest;
import io.camunda.zeebe.gateway.admin.backup.BackupListRequest;
import io.camunda.zeebe.gateway.admin.backup.BackupResponse;
import io.camunda.zeebe.gateway.admin.backup.BackupStatus;
import io.camunda.zeebe.gateway.admin.backup.BackupStatusRequest;
import io.camunda.zeebe.gateway.admin.backup.BrokerBackupRequest;
import io.camunda.zeebe.gateway.admin.backup.PartitionBackupStatus;
import io.camunda.zeebe.gateway.admin.backup.State;
import io.camunda.zeebe.gateway.cmd.NoTopologyAvailableException;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.protocol.impl.encoding.BackupListResponse;
import io.camunda.zeebe.protocol.impl.encoding.BackupStatusResponse;
import io.camunda.zeebe.protocol.management.BackupStatusCode;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class BackupRequestHandler
implements BackupApi {
    final BrokerClient brokerClient;
    final BrokerTopologyManager topologyManager;

    public BackupRequestHandler(BrokerClient brokerClient) {
        this.brokerClient = brokerClient;
        this.topologyManager = brokerClient.getTopologyManager();
    }

    @Override
    public CompletionStage<Long> takeBackup(long backupId) {
        return this.checkTopologyComplete().thenCompose(topology -> {
            List<CompletableFuture> backupsTaken = topology.getPartitions().stream().map(partitionId -> BackupRequestHandler.createBackupRequest(backupId, partitionId)).map(this.brokerClient::sendRequestWithRetry).toList();
            return CompletableFuture.allOf((CompletableFuture[])backupsTaken.toArray(CompletableFuture[]::new)).thenApply(ignore -> {
                BackupResponse aggregatedResponse = backupsTaken.stream().map(response -> (BackupResponse)((BrokerResponse)response.join()).getResponse()).distinct().reduce((r1, r2) -> new BackupResponse(r1.created() || r2.created(), Long.max(r1.checkpointId(), r2.checkpointId()))).orElseThrow();
                if (aggregatedResponse.created() && aggregatedResponse.checkpointId() == backupId) {
                    return backupId;
                }
                throw new BackupAlreadyExistException(backupId, aggregatedResponse.checkpointId());
            });
        });
    }

    @Override
    public CompletionStage<BackupStatus> getStatus(long backupId) {
        return this.checkTopologyComplete().thenCompose(topology -> {
            List<CompletableFuture> statusesReceived = topology.getPartitions().stream().map(partitionId -> this.createStatusQueryRequest(backupId, (int)partitionId)).map(this.brokerClient::sendRequestWithRetry).toList();
            return CompletableFuture.allOf((CompletableFuture[])statusesReceived.toArray(CompletableFuture[]::new)).thenApply(ignore -> {
                List<PartitionBackupStatus> partitionStatuses = statusesReceived.stream().map(response -> (BackupStatusResponse)((BrokerResponse)response.join()).getResponse()).map(PartitionBackupStatus::from).toList();
                return this.aggregatePartitionStatus(backupId, partitionStatuses);
            });
        });
    }

    @Override
    public CompletionStage<List<BackupStatus>> listBackups() {
        return this.checkTopologyComplete().thenCompose(topology -> {
            List<CompletableFuture> backupsReceived = topology.getPartitions().stream().map(this::createListRequest).map(this.brokerClient::sendRequestWithRetry).toList();
            return CompletableFuture.allOf((CompletableFuture[])backupsReceived.toArray(CompletableFuture[]::new)).thenApply(ignore -> this.aggregateBackupList(backupsReceived));
        });
    }

    @Override
    public CompletionStage<Void> deleteBackup(long backupId) {
        return this.checkTopologyComplete().thenCompose(topology -> CompletableFuture.allOf((CompletableFuture[])topology.getPartitions().stream().map(partitionId -> this.createDeleteRequest(backupId, (Integer)partitionId)).map(this.brokerClient::sendRequestWithRetry).toArray(CompletableFuture[]::new)));
    }

    private List<BackupStatus> aggregateBackupList(List<CompletableFuture<BrokerResponse<BackupListResponse>>> backupsReceived) {
        Map statusByBackupAndPartition = backupsReceived.stream().map(f -> (BackupListResponse)((BrokerResponse)f.join()).getResponse()).flatMap(backupListResponse -> backupListResponse.getBackups().stream()).collect(Collectors.groupingBy(BackupListResponse.BackupStatus::backupId, Collectors.toMap(BackupListResponse.BackupStatus::partitionId, Function.identity(), this::mergeDuplicatePartitionBackupStatus)));
        List<Integer> partitions = this.topologyManager.getTopology().getPartitions();
        return statusByBackupAndPartition.entrySet().stream().map(entry -> {
            Long backupId = (Long)entry.getKey();
            Map statusByPartition = (Map)entry.getValue();
            return this.aggregatePartitionStatus(backupId, partitions.stream().map(partitionId -> {
                if (!statusByPartition.containsKey(partitionId)) {
                    return PartitionBackupStatus.notExistingStatus(partitionId);
                }
                BackupListResponse.BackupStatus status = (BackupListResponse.BackupStatus)statusByPartition.get(partitionId);
                return new PartitionBackupStatus(status.partitionId(), status.status(), status.status() == BackupStatusCode.FAILED ? Optional.ofNullable(status.failureReason()) : Optional.empty(), Optional.ofNullable(status.createdAt()), Optional.empty(), Optional.empty(), OptionalLong.empty(), OptionalInt.empty(), Optional.ofNullable(status.brokerVersion()));
            }).toList());
        }).toList();
    }

    private BackupListResponse.BackupStatus mergeDuplicatePartitionBackupStatus(BackupListResponse.BackupStatus x, BackupListResponse.BackupStatus y) {
        if (x.partitionId() != y.partitionId()) {
            throw new IllegalArgumentException("Expected to merge backup status from same partitions, but provided backups of different partitions. Provided backups : %s, %s".formatted(x, y));
        }
        List<BackupStatusCode> comparingOrder = List.of(BackupStatusCode.SBE_UNKNOWN, BackupStatusCode.DOES_NOT_EXIST, BackupStatusCode.FAILED, BackupStatusCode.IN_PROGRESS, BackupStatusCode.COMPLETED);
        return Collections.max(List.of(x, y), Comparator.comparing(BackupListResponse.BackupStatus::status, Comparator.comparingInt(comparingOrder::indexOf)));
    }

    private CompletionStage<BrokerClusterState> checkTopologyComplete() {
        int knownPartitions;
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null) {
            return CompletableFuture.failedFuture(new NoTopologyAvailableException());
        }
        int expectedPartitionCount = topology.getPartitionsCount();
        if (expectedPartitionCount != (knownPartitions = topology.getPartitions().size())) {
            return CompletableFuture.failedFuture(new IncompleteTopologyException("Expected to send request to all %d partitions, but found only %d partitions in topology.".formatted(expectedPartitionCount, knownPartitions)));
        }
        return CompletableFuture.completedFuture(topology);
    }

    private BackupStatus aggregatePartitionStatus(long backupId, List<PartitionBackupStatus> partitionStatuses) {
        State combinedStatus = this.getAggregatedStatus(partitionStatuses);
        String failureReason = null;
        if (combinedStatus == State.FAILED) {
            failureReason = this.collectFailureReason(partitionStatuses);
        }
        return new BackupStatus(backupId, combinedStatus, Optional.ofNullable(failureReason), partitionStatuses);
    }

    private String collectFailureReason(List<PartitionBackupStatus> partitionStatuses) {
        return partitionStatuses.stream().filter(p -> p.status() == BackupStatusCode.FAILED).map(p -> {
            String reason = p.failureReason().orElse("Unknown reason");
            return "Backup on partition %d failed due to %s. ".formatted(p.partitionId(), reason);
        }).collect(Collectors.joining());
    }

    private State getAggregatedStatus(List<PartitionBackupStatus> partitionStatuses) {
        List<BackupStatusCode> statuses = partitionStatuses.stream().map(PartitionBackupStatus::status).distinct().toList();
        if (statuses.contains(BackupStatusCode.FAILED)) {
            return State.FAILED;
        }
        if ((statuses.contains(BackupStatusCode.IN_PROGRESS) || statuses.contains(BackupStatusCode.COMPLETED)) && statuses.contains(BackupStatusCode.DOES_NOT_EXIST)) {
            return State.INCOMPLETE;
        }
        if (statuses.contains(BackupStatusCode.IN_PROGRESS)) {
            return State.IN_PROGRESS;
        }
        if (statuses.contains(BackupStatusCode.DOES_NOT_EXIST)) {
            return State.DOES_NOT_EXIST;
        }
        if (statuses.size() == 1 && statuses.contains(BackupStatusCode.COMPLETED)) {
            return State.COMPLETED;
        }
        throw new IllegalStateException("Backup status cannot be calculated from status of partitions backup %s. Possible incomplete topology.".formatted(partitionStatuses));
    }

    private BackupStatusRequest createStatusQueryRequest(long backupId, int partitionId) {
        BackupStatusRequest request = new BackupStatusRequest();
        request.setBackupId(backupId);
        request.setPartitionId(partitionId);
        return request;
    }

    private static BrokerBackupRequest createBackupRequest(long backupId, int partitionId) {
        BrokerBackupRequest request = new BrokerBackupRequest();
        request.setBackupId(backupId);
        request.setPartitionId(partitionId);
        return request;
    }

    private BackupListRequest createListRequest(Integer partitionId) {
        BackupListRequest request = new BackupListRequest();
        request.setPartitionId(partitionId);
        return request;
    }

    private BackupDeleteRequest createDeleteRequest(long backupId, Integer partitionId) {
        BackupDeleteRequest request = new BackupDeleteRequest();
        request.setPartitionId(partitionId);
        request.setBackupId(backupId);
        return request;
    }
}

