/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.RpcServer;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.RetryUtils;
import com.rabbitmq.client.amqp.impl.RpcSupport;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AmqpRpcServer
implements RpcServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class);
    private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {};
    private static final Predicate<Exception> RESPONSE_SENDING_EXCEPTION_PREDICATE = ex -> ex instanceof AmqpException.AmqpResourceInvalidStateException && !(ex instanceof AmqpException.AmqpResourceClosedException);
    private static final List<Duration> RESPONSE_SENDING_RETRY_WAIT_TIMES = List.of(Duration.ofSeconds(1L), Duration.ofSeconds(3L), Duration.ofSeconds(5L), Duration.ofSeconds(10L));
    private final AmqpConnection connection;
    private final Publisher publisher;
    private final Consumer consumer;
    private final Function<Message, Object> correlationIdExtractor;
    private final BiFunction<Message, Object, Message> replyPostProcessor;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
        this.connection = builder.connection();
        RpcServer.Handler handler = builder.handler();
        this.publisher = this.connection.publisherBuilder().build();
        RpcServer.Context context = new RpcServer.Context(){

            @Override
            public Message message() {
                return AmqpRpcServer.this.publisher.message();
            }

            @Override
            public Message message(byte[] body) {
                return AmqpRpcServer.this.publisher.message(body);
            }
        };
        this.correlationIdExtractor = builder.correlationIdExtractor() == null ? Message::messageId : builder.correlationIdExtractor();
        this.replyPostProcessor = builder.replyPostProcessor() == null ? Message::correlationId : builder.replyPostProcessor();
        this.consumer = this.connection.consumerBuilder().queue(builder.requestQueue()).messageHandler((ctx, msg) -> {
            Object correlationId;
            ctx.accept();
            Message reply = handler.handle(context, msg);
            if (reply != null && msg.replyTo() != null) {
                reply.to(msg.replyTo());
            }
            if ((reply = this.replyPostProcessor.apply(reply, correlationId = this.correlationIdExtractor.apply(msg))) != null) {
                this.sendReply(reply);
            }
        }).build();
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.removeRpcServer(this);
            try {
                this.consumer.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing RPC server consumer: {}", (Object)e.getMessage());
            }
            try {
                this.publisher.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing RPC server publisher: {}", (Object)e.getMessage());
            }
        }
    }

    private void sendReply(Message reply) {
        try {
            RetryUtils.callAndMaybeRetry(() -> {
                this.publisher.publish(reply, NO_OP_CALLBACK);
                return null;
            }, RESPONSE_SENDING_EXCEPTION_PREDICATE, RESPONSE_SENDING_RETRY_WAIT_TIMES, "RPC Server Response", new Object[0]);
        }
        catch (Exception e) {
            LOGGER.info("Error while processing RPC request: {}", (Object)e.getMessage());
        }
    }
}

