/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.queryapi;

import io.camunda.zeebe.broker.system.configuration.QueryApiCfg;
import io.camunda.zeebe.broker.transport.queryapi.QueryApiRequestHandler;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteQueryRequest;
import io.camunda.zeebe.protocol.impl.encoding.ExecuteQueryResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.test.util.asserts.EitherAssert;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@Execution(value=ExecutionMode.CONCURRENT)
final class QueryApiRequestHandlerTest {
    private final ActorScheduler scheduler = ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(2).setIoBoundActorThreadCount(2).build();

    QueryApiRequestHandlerTest() {
    }

    @BeforeEach
    void beforeEach() {
        this.scheduler.start();
    }

    @AfterEach
    void afterEach() throws Exception {
        this.scheduler.close();
    }

    @DisplayName(value="should respond with UNSUPPORTED_MESSAGE when QueryApi is disabled")
    @Test
    void disabledQueryApi() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(false);
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest()).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.UNSUPPORTED_MESSAGE, "Failed to handle query as the query API is disabled; did you configure zeebe.broker.experimental.queryapi.enabled?"});
    }

    @DisplayName(value="should respond with PARTITION_LEADER_MISMATCH when no service is registered")
    @Test
    void noQueryServiceForPartition() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        sut.addPartition(1, (QueryService)Mockito.mock(QueryService.class));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(9999)).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.PARTITION_LEADER_MISMATCH, "Expected to handle client message on the leader of partition '9999', but this node is not the leader for it"});
    }

    @DisplayName(value="should respond with PARTITION_LEADER_MISMATCH when the service is closed")
    @Test
    void closedQueryService() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        sut.addPartition(1, (QueryService)Mockito.mock(QueryService.class, i -> {
            throw new QueryService.ClosedServiceException();
        }));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(9999)).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.PARTITION_LEADER_MISMATCH, "Expected to handle client message on the leader of partition '9999', but this node is not the leader for it"});
    }

    @DisplayName(value="should respond with PROCESS_NOT_FOUND when no process with key exists")
    @Test
    void processNotFound() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        sut.addPartition(1, (QueryService)Mockito.mock(QueryService.class));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.PROCESS)).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.PROCESS_NOT_FOUND, "Expected to find the process ID for resource of type PROCESS with key 1, but no such resource was found"});
    }

    @DisplayName(value="should respond with PROCESS_NOT_FOUND when no process instance with key exists")
    @Test
    void processInstanceNotFound() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        sut.addPartition(1, (QueryService)Mockito.mock(QueryService.class));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.PROCESS_INSTANCE)).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.PROCESS_NOT_FOUND, "Expected to find the process ID for resource of type PROCESS_INSTANCE with key 1, but no such resource was found"});
    }

    @DisplayName(value="should respond with PROCESS_NOT_FOUND when no job with key exists")
    @Test
    void jobNotFound() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        sut.addPartition(1, (QueryService)Mockito.mock(QueryService.class));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.JOB)).join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).containsExactly(new Object[]{ErrorCode.PROCESS_NOT_FOUND, "Expected to find the process ID for resource of type JOB with key 1, but no such resource was found"});
    }

    @DisplayName(value="should respond with bpmnProcessId when process found")
    @Test
    void processFound() throws QueryService.ClosedServiceException {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        DirectBuffer bpmnProcessId = BufferUtil.wrapString((String)"OneProcessToFindThem");
        QueryService queryService = (QueryService)Mockito.mock(QueryService.class);
        sut.addPartition(1, queryService);
        Mockito.when((Object)queryService.getBpmnProcessIdForProcess(1L)).thenReturn(Optional.of(bpmnProcessId));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.PROCESS)).join();
        EitherAssert.assertThat(response).isRight().extracting(Either::get).extracting(ExecuteQueryResponse::getBpmnProcessId).isEqualTo((Object)"OneProcessToFindThem");
    }

    @DisplayName(value="should respond with bpmnProcessId when job found")
    @Test
    void jobFound() throws QueryService.ClosedServiceException {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        DirectBuffer bpmnProcessId = BufferUtil.wrapString((String)"OneProcessToFindThem");
        QueryService queryService = (QueryService)Mockito.mock(QueryService.class);
        sut.addPartition(1, queryService);
        Mockito.when((Object)queryService.getBpmnProcessIdForJob(1L)).thenReturn(Optional.of(bpmnProcessId));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.JOB)).join();
        EitherAssert.assertThat(response).isRight().extracting(Either::get).extracting(ExecuteQueryResponse::getBpmnProcessId).isEqualTo((Object)"OneProcessToFindThem");
    }

    @DisplayName(value="should respond with bpmnProcessId when process instance found")
    @Test
    void processInstanceFound() throws QueryService.ClosedServiceException {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        DirectBuffer bpmnProcessId = BufferUtil.wrapString((String)"OneProcessToFindThem");
        QueryService queryService = (QueryService)Mockito.mock(QueryService.class);
        sut.addPartition(1, queryService);
        Mockito.when((Object)queryService.getBpmnProcessIdForProcessInstance(1L)).thenReturn(Optional.of(bpmnProcessId));
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendRequest(new ExecuteQueryRequest().setPartitionId(1).setKey(1L).setValueType(ValueType.PROCESS_INSTANCE)).join();
        EitherAssert.assertThat(response).isRight().extracting(Either::get).extracting(ExecuteQueryResponse::getBpmnProcessId).isEqualTo((Object)"OneProcessToFindThem");
    }

    @DisplayName(value="should return MALFORMED_REQUEST on exception thrown while reading the request")
    @Test
    void malformedRequest() {
        QueryApiRequestHandler sut = this.createQueryApiRequestHandler(true);
        Either<ErrorResponse, ExecuteQueryResponse> response = new AsyncExecuteQueryRequestSender(sut).sendExplodingRequest().join();
        EitherAssert.assertThat(response).isLeft().extracting(Either::getLeft).extracting(new Function[]{ErrorResponse::getErrorCode, error -> BufferUtil.bufferAsString((DirectBuffer)error.getErrorData())}).contains(new Object[]{ErrorCode.MALFORMED_REQUEST});
    }

    private QueryApiRequestHandler createQueryApiRequestHandler(boolean enabled) {
        QueryApiCfg config = new QueryApiCfg();
        config.setEnabled(enabled);
        QueryApiRequestHandler requestHandler = new QueryApiRequestHandler(config);
        this.scheduler.submitActor((Actor)requestHandler);
        return requestHandler;
    }

    private static final class AsyncExecuteQueryRequestSender {
        private final QueryApiRequestHandler sut;
        private int requestCount = 0;

        public AsyncExecuteQueryRequestSender(QueryApiRequestHandler requestHandler) {
            this.sut = requestHandler;
        }

        private CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>> sendRequest(ExecuteQueryRequest queryRequest) {
            CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>> future = new CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>>();
            ServerOutput serverOutput = this.createServerOutput(future);
            int partitionId = queryRequest.getPartitionId();
            DirectBuffer request = BufferUtil.createCopy((BufferWriter)queryRequest);
            this.sut.onRequest(serverOutput, partitionId, (long)this.requestCount++, request, 0, request.capacity());
            return future;
        }

        private CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>> sendExplodingRequest() {
            CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>> future = new CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>>();
            ServerOutput serverOutput = this.createServerOutput(future);
            UnsafeBuffer request = new UnsafeBuffer();
            this.sut.onRequest(serverOutput, 1, (long)this.requestCount++, (DirectBuffer)request, 0, -1);
            return future;
        }

        private ServerOutput createServerOutput(CompletableFuture<Either<ErrorResponse, ExecuteQueryResponse>> future) {
            return serverResponse -> {
                ExpandableArrayBuffer buffer = new ExpandableArrayBuffer();
                serverResponse.write((MutableDirectBuffer)buffer, 0);
                ErrorResponse error = new ErrorResponse();
                if (error.tryWrap((DirectBuffer)buffer)) {
                    error.wrap((DirectBuffer)buffer, 0, serverResponse.getLength());
                    future.complete(Either.left((Object)error));
                    return;
                }
                ExecuteQueryResponse response = new ExecuteQueryResponse();
                try {
                    response.wrap((DirectBuffer)buffer, 0, serverResponse.getLength());
                    future.complete(Either.right((Object)response));
                }
                catch (Exception e) {
                    future.completeExceptionally(e);
                }
            };
        }
    }
}

