/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.google.common.base.Enums;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Splitter;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.Session;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.apache.pulsar.websocket.AbstractWebSocketHandler;
import org.apache.pulsar.websocket.WebSocketError;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerHandler
extends AbstractWebSocketHandler {
    private String subscription = null;
    private SubscriptionType subscriptionType;
    private SubscriptionMode subscriptionMode;
    private Consumer<byte[]> consumer;
    private int maxPendingMessages = 0;
    private final AtomicInteger pendingMessages = new AtomicInteger();
    private final boolean pullMode;
    private final LongAdder numMsgsDelivered = new LongAdder();
    private final LongAdder numBytesDelivered = new LongAdder();
    private final LongAdder numMsgsAcked = new LongAdder();
    private volatile long msgDeliveredCounter = 0L;
    private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");
    private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

    public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
        super(service, request, response);
        this.pullMode = Boolean.valueOf((String)this.queryParams.get("pullMode"));
        try {
            this.subscription = ConsumerHandler.extractSubscription(request);
            ConsumerBuilderImpl builder = (ConsumerBuilderImpl)this.getConsumerConfiguration(service.getPulsarClient());
            if (!this.pullMode) {
                this.maxPendingMessages = builder.getConf().getReceiverQueueSize() == 0 ? 1 : builder.getConf().getReceiverQueueSize();
            }
            this.subscriptionType = builder.getConf().getSubscriptionType();
            this.subscriptionMode = builder.getConf().getSubscriptionMode();
            if (!this.checkAuth(response)) {
                return;
            }
            this.consumer = builder.topic(this.topic.toString()).subscriptionName(this.subscription).subscribe();
            if (!this.service.addConsumer(this)) {
                log.warn("[{}:{}] Failed to add consumer handler for topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.topic});
            }
        }
        catch (Exception e) {
            log.warn("[{}:{}] Failed in creating subscription {} on topic {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), this.subscription, this.topic, e});
            try {
                response.sendError(ConsumerHandler.getErrorCode(e), ConsumerHandler.getErrorMessage(e));
            }
            catch (IOException e1) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1});
            }
        }
    }

    private void receiveMessage() {
        if (log.isDebugEnabled()) {
            log.debug("[{}:{}] [{}] [{}] Receive next message", new Object[]{this.request.getRemoteAddr(), this.request.getRemotePort(), this.topic, this.subscription});
        }
        ((CompletableFuture)this.consumer.receiveAsync().thenAccept(msg -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Got message {}", new Object[]{this.getSession().getRemoteAddress(), this.topic, this.subscription, msg.getMessageId()});
            }
            ConsumerMessage dm = new ConsumerMessage();
            dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
            dm.payload = Base64.getEncoder().encodeToString(msg.getData());
            dm.properties = msg.getProperties();
            dm.publishTime = DateFormatter.format(msg.getPublishTime());
            dm.redeliveryCount = msg.getRedeliveryCount();
            if (msg.getEventTime() != 0L) {
                dm.eventTime = DateFormatter.format(msg.getEventTime());
            }
            if (msg.hasKey()) {
                dm.key = msg.getKey();
            }
            final long msgSize = msg.getData().length;
            try {
                this.getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(dm), new WriteCallback(){

                    @Override
                    public void writeFailed(Throwable th) {
                        log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                        ConsumerHandler.this.pendingMessages.decrementAndGet();
                        ConsumerHandler.this.service.getExecutor().execute(() -> ConsumerHandler.this.receiveMessage());
                    }

                    @Override
                    public void writeSuccess() {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}/{}] message is delivered successfully to {} ", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString()});
                        }
                        ConsumerHandler.this.updateDeliverMsgStat(msgSize);
                    }
                });
            }
            catch (JsonProcessingException e) {
                this.close(WebSocketError.FailedToSerializeToJSON);
            }
            int pending = this.pendingMessages.incrementAndGet();
            if (pending < this.maxPendingMessages) {
                this.service.getExecutor().execute(this::receiveMessage);
            }
        })).exceptionally(exception -> {
            if (exception.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.info("[{}/{}] Consumer was closed while receiving msg from broker", (Object)this.consumer.getTopic(), (Object)this.subscription);
            } else {
                log.warn("[{}/{}] Error occurred while consumer handler was delivering msg to {}: {}", new Object[]{this.consumer.getTopic(), this.subscription, this.getRemote().getInetSocketAddress().toString(), exception.getMessage()});
            }
            return null;
        });
    }

    @Override
    public void onWebSocketConnect(Session session) {
        super.onWebSocketConnect(session);
        if (!this.pullMode) {
            this.receiveMessage();
        }
    }

    @Override
    public void onWebSocketText(String message) {
        super.onWebSocketText(message);
        try {
            ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
            if ("permit".equals(command.type)) {
                this.handlePermit(command);
            } else if ("unsubscribe".equals(command.type)) {
                this.handleUnsubscribe(command);
            } else if ("negativeAcknowledge".equals(command.type)) {
                this.handleNack(command);
            } else if ("isEndOfTopic".equals(command.type)) {
                this.handleEndOfTopic();
            } else {
                this.handleAck(command);
            }
        }
        catch (IOException e) {
            log.warn("Failed to deserialize message id: {}", (Object)message, (Object)e);
            this.close(WebSocketError.FailedToDeserializeFromJSON);
        }
    }

    private void handleEndOfTopic() {
        try {
            String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(new EndOfTopicResponse(this.consumer.hasReachedEndOfTopic()));
            this.getSession().getRemote().sendString(msg, new WriteCallback(){

                @Override
                public void writeFailed(Throwable th) {
                    log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                }

                @Override
                public void writeSuccess() {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}/{}] End of topic message is delivered successfully to {} ", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString()});
                    }
                }
            });
        }
        catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate end of topic response: {}", (Object)this.consumer.getTopic(), (Object)e.getMessage());
        }
        catch (Exception e) {
            log.warn("[{}] Failed to send end of topic response: {}", (Object)this.consumer.getTopic(), (Object)e.getMessage());
        }
    }

    private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
        this.consumer.unsubscribe();
    }

    private void checkResumeReceive() {
        int pending;
        if (!this.pullMode && (pending = this.pendingMessages.getAndDecrement()) >= this.maxPendingMessages) {
            this.receiveMessage();
        }
    }

    private void handleAck(ConsumerCommand command) throws IOException {
        MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), this.topic.toString());
        this.consumer.acknowledgeAsync(msgId).thenAccept(consumer -> this.numMsgsAcked.increment());
        this.checkResumeReceive();
    }

    private void handleNack(ConsumerCommand command) throws IOException {
        MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), this.topic.toString());
        System.out.println(msgId);
        this.consumer.negativeAcknowledge(msgId);
        this.checkResumeReceive();
    }

    private void handlePermit(ConsumerCommand command) throws IOException {
        int pending;
        if (command.permitMessages == null) {
            throw new IOException("Missing required permitMessages field for 'permit' command");
        }
        if (this.pullMode && (pending = this.pendingMessages.getAndAdd(-command.permitMessages.intValue())) >= 0) {
            this.receiveMessage();
        }
    }

    @Override
    public void close() throws IOException {
        if (this.consumer != null) {
            if (!this.service.removeConsumer(this)) {
                log.warn("[{}] Failed to remove consumer handler", (Object)this.consumer.getTopic());
            }
            ((CompletableFuture)this.consumer.closeAsync().thenAccept(x -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed consumer asynchronously", (Object)this.consumer.getTopic());
                }
            })).exceptionally(exception -> {
                log.warn("[{}] Failed to close consumer", (Object)this.consumer.getTopic(), exception);
                return null;
            });
        }
    }

    public Consumer<byte[]> getConsumer() {
        return this.consumer;
    }

    public String getSubscription() {
        return this.subscription;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public SubscriptionMode getSubscriptionMode() {
        return this.subscriptionMode;
    }

    public long getAndResetNumMsgsDelivered() {
        return this.numMsgsDelivered.sumThenReset();
    }

    public long getAndResetNumBytesDelivered() {
        return this.numBytesDelivered.sumThenReset();
    }

    public long getAndResetNumMsgsAcked() {
        return this.numMsgsAcked.sumThenReset();
    }

    public long getMsgDeliveredCounter() {
        return this.msgDeliveredCounter;
    }

    protected void updateDeliverMsgStat(long msgSize) {
        this.numMsgsDelivered.increment();
        MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
        this.numBytesDelivered.add(msgSize);
    }

    protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
        ConsumerBuilder<byte[]> builder = client.newConsumer();
        if (this.queryParams.containsKey("ackTimeoutMillis")) {
            builder.ackTimeout(Integer.parseInt((String)this.queryParams.get("ackTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("subscriptionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(SubscriptionType.class, (String)this.queryParams.get("subscriptionType")).isPresent(), "Invalid subscriptionType %s", this.queryParams.get("subscriptionType"));
            builder.subscriptionType(SubscriptionType.valueOf((String)this.queryParams.get("subscriptionType")));
        }
        if (this.queryParams.containsKey("subscriptionMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(SubscriptionMode.class, (String)this.queryParams.get("subscriptionMode")).isPresent(), "Invalid subscriptionMode %s", this.queryParams.get("subscriptionMode"));
            builder.subscriptionMode(SubscriptionMode.valueOf((String)this.queryParams.get("subscriptionMode")));
        }
        if (this.queryParams.containsKey("receiverQueueSize")) {
            builder.receiverQueueSize(Math.min(Integer.parseInt((String)this.queryParams.get("receiverQueueSize")), 1000));
        }
        if (this.queryParams.containsKey("consumerName")) {
            builder.consumerName((String)this.queryParams.get("consumerName"));
        }
        if (this.queryParams.containsKey("priorityLevel")) {
            builder.priorityLevel(Integer.parseInt((String)this.queryParams.get("priorityLevel")));
        }
        if (this.queryParams.containsKey("maxRedeliverCount") || this.queryParams.containsKey("deadLetterTopic")) {
            DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
            if (this.queryParams.containsKey("maxRedeliverCount")) {
                dlpBuilder.maxRedeliverCount(Integer.parseInt((String)this.queryParams.get("maxRedeliverCount"))).deadLetterTopic(String.format("%s-%s-DLQ", this.topic, this.subscription));
            }
            if (this.queryParams.containsKey("deadLetterTopic")) {
                dlpBuilder.deadLetterTopic((String)this.queryParams.get("deadLetterTopic"));
            }
            if (this.queryParams.containsKey("negativeAckRedeliveryDelay")) {
                builder.negativeAckRedeliveryDelay(Integer.parseInt((String)this.queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
            }
            builder.deadLetterPolicy(dlpBuilder.build());
        }
        if (this.queryParams.containsKey("cryptoFailureAction")) {
            String action = (String)this.queryParams.get("cryptoFailureAction");
            try {
                builder.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(action));
            }
            catch (Exception e) {
                log.warn("Failed to configure cryptoFailureAction {} , {}", (Object)action, (Object)e.getMessage());
            }
        }
        return builder;
    }

    @Override
    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
        return this.service.getAuthorizationService().canConsume(this.topic, authRole, authenticationData, this.subscription);
    }

    public static String extractSubscription(HttpServletRequest request) {
        String uri = request.getRequestURI();
        List<String> parts = Splitter.on("/").splitToList(uri);
        Preconditions.checkArgument(parts.size() == 9, "Invalid topic name format");
        Preconditions.checkArgument(parts.get(1).equals("ws"));
        boolean isV2Format = parts.get(2).equals("v2");
        int domainIndex = isV2Format ? 4 : 3;
        Preconditions.checkArgument(parts.get(domainIndex).equals("persistent") || parts.get(domainIndex).equals("non-persistent"));
        Preconditions.checkArgument(parts.get(8).length() > 0, "Empty subscription name");
        return Codec.decode(parts.get(8));
    }
}

