/*
 * Decompiled with CFR 0.152.
 */
package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Event;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarEventHandlerWrapper<E extends Event>
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(PulsarEventHandlerWrapper.class);
    private final String applicationName;
    private final EventHandler<E> eventHandler;
    private final ObjectMapper objectMapper;
    private final PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;
    private final String tenant;

    public PulsarEventHandlerWrapper(String applicationName, String tenant, EventHandler<E> eventHandler, ObjectMapper objectMapper, PulsarClient pulsarClient) {
        this.applicationName = applicationName;
        this.eventHandler = eventHandler;
        this.objectMapper = objectMapper;
        this.pulsarClient = pulsarClient;
        this.tenant = tenant;
    }

    public void subscribe() {
        if (this.consumer != null) {
            logger.info("Already subscribed.  Not subscribing again.");
            return;
        }
        try {
            String eventTopicUrl = this.tenant + "/events/" + this.eventHandler.getChannelName();
            String subscriptionName = this.getSubscriptionName();
            this.consumer = this.pulsarClient.newConsumer().subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).topic(new String[]{eventTopicUrl}).messageListener(this::handleMessage).subscribe();
        }
        catch (PulsarClientException e) {
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    private String getSubscriptionName() {
        return this.applicationName + "-" + this.eventHandler.getChannelName() + "-" + this.eventHandler.getHandlerName();
    }

    @Override
    public void close() throws IOException {
        this.consumer.close();
    }

    private void handleMessage(Consumer<byte[]> consumer, Message<byte[]> msg) {
        try {
            Event event = (Event)this.objectMapper.readValue(msg.getData(), this.eventHandler.getEventClass());
            consumer.acknowledge(msg);
            this.handleEvent(event);
        }
        catch (IOException e) {
            logger.error("Could not parse event on channel {} with class {}", new Object[]{this.eventHandler.getChannelName(), this.eventHandler.getEventClass().getName(), e});
            consumer.negativeAcknowledge(msg);
        }
    }

    private void handleEvent(E event) {
        try {
            this.eventHandler.handleEvent(event);
        }
        catch (Exception e) {
            logger.warn("Caught unhandled exception thrown from event handler. Event hangler name: {}.  Message: {}", new Object[]{this.eventHandler.getHandlerName(), e.getMessage(), e});
        }
    }
}

