/*
 * Decompiled with CFR 0.152.
 */
package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.ipc.EventRecord;
import edu.stanford.protege.webprotege.ipc.GenericEventHandler;
import java.io.IOException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

public class PulsarGenericEventHandlerWrapper {
    private static final Logger logger = LoggerFactory.getLogger(PulsarGenericEventHandlerWrapper.class);
    private final String applicationName;
    private final String tenant;
    private final PulsarClient pulsarClient;
    private final GenericEventHandler handler;
    private final ObjectMapper objectMapper;

    public PulsarGenericEventHandlerWrapper(@Value(value="${spring.application.name}") String applicationName, @Value(value="${webprotege.pulsar.tenant}") String tenant, PulsarClient pulsarClient, GenericEventHandler handler, ObjectMapper objectMapper) {
        this.applicationName = applicationName;
        this.tenant = tenant;
        this.pulsarClient = pulsarClient;
        this.handler = handler;
        this.objectMapper = objectMapper;
    }

    public void subscribe() {
        try {
            String subscriptionName = this.applicationName + "--" + this.handler.getHandlerName();
            String consumerName = this.applicationName + "--" + this.handler.getHandlerName() + "--Consumer";
            logger.info("Subscribing consumer {} to {}", (Object)consumerName, (Object)"webprotege.events.all");
            String topicUrl = this.tenant + "/events/webprotege.events.all";
            this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).subscriptionName(subscriptionName).consumerName(consumerName).topic(new String[]{topicUrl}).messageListener(this::handleMessage).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            this.handler.handlerSubscribed();
        }
        catch (PulsarClientException e) {
            logger.error("Could not subscribe to event topics", (Throwable)e);
        }
    }

    private void handleMessage(Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            String eventType = message.getProperty("webprotege_eventType");
            if (eventType == null) {
                logger.warn("Cound not handle event message because {} header is missing", (Object)"webprotege_eventType");
                consumer.acknowledge(message);
                return;
            }
            EventRecord eventRecord = (EventRecord)this.objectMapper.readValue((byte[])message.getValue(), EventRecord.class);
            consumer.acknowledge(message);
            try {
                this.handler.handleEventRecord(eventRecord);
            }
            catch (Exception e) {
                logger.error("Handled exception thrown by EventRecord handler: {}", (Object)this.handler.getHandlerName(), (Object)e);
            }
        }
        catch (IOException e) {
            logger.error("An error occurred reading an event record", (Throwable)e);
        }
    }
}

