/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.azure.eventhubs.internal.client;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.mulesoft.connectors.azure.eventhubs.internal.client.EventHubConsumerClient;
import com.mulesoft.connectors.azure.eventhubs.internal.error.exception.ConnectivityException;
import com.mulesoft.connectors.azure.eventhubs.internal.source.eventhandler.EventHandler;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultEventHubConsumerClient
implements EventHubConsumerClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubConsumerClient.class);
    private final int checkpointFrequency;
    private final EventHandler eventHandler;
    private final EventProcessorClientBuilder builder;
    private EventProcessorClient client;

    public DefaultEventHubConsumerClient(EventProcessorClientBuilder builder, int checkpointFrequency, EventHandler eventHandler) {
        this.builder = builder;
        this.checkpointFrequency = checkpointFrequency;
        this.eventHandler = eventHandler;
    }

    public int getCheckpointFrequency() {
        return this.checkpointFrequency;
    }

    public EventHandler getEventHandler() {
        return this.eventHandler;
    }

    public EventProcessorClientBuilder getBuilder() {
        return this.builder;
    }

    public EventProcessorClient getClient() {
        return this.client;
    }

    public void setClient(EventProcessorClient client) {
        this.client = client;
    }

    @Override
    public void consume() {
        this.client = this.builder.processEvent(this::eventContextHandler).processError(this::errorContextHandler).buildEventProcessorClient();
        this.client.start();
    }

    @Override
    public void close() {
        this.client.stop(Duration.ofSeconds(100L));
    }

    private void eventContextHandler(EventContext eventContext) {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData event = eventContext.getEventData();
        LOGGER.info("Starting Event Processor in partition {} and Sequence Number {}", (Object)partitionContext.getPartitionId(), (Object)event.getSequenceNumber());
        if (event.getSequenceNumber() % (long)this.checkpointFrequency == 0L) {
            LOGGER.debug("[subscribe] Saving Checkpoint in partition {}, Sequence Number {} and Consumer Group {}", new Object[]{partitionContext.getPartitionId(), event.getSequenceNumber(), partitionContext.getConsumerGroup()});
            eventContext.updateCheckpointAsync().doOnSuccess(result -> LOGGER.debug("Checkpoint saved successfully: partition {}, Sequence Number {}, Consumer Group {}", new Object[]{partitionContext.getPartitionId(), event.getSequenceNumber(), partitionContext.getConsumerGroup()})).doOnError(error -> LOGGER.error("Checkpoint not saved : partition {}, Sequence Number {}, Consumer Group {}, error {} ", new Object[]{partitionContext.getPartitionId(), event.getSequenceNumber(), partitionContext.getConsumerGroup(), error.getMessage()})).subscribe();
        }
        this.eventHandler.handle(event);
    }

    private void errorContextHandler(ErrorContext errorContext) {
        Throwable throwable = errorContext.getThrowable();
        if (throwable.getClass().isAssignableFrom(TimeoutException.class)) {
            throw new ConnectivityException("Connectivity issue by a Timeout Exception", throwable);
        }
        LOGGER.error("Error occurred in Event Processor for partition {}, {}", new Object[]{errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable().getMessage(), errorContext.getThrowable()});
    }
}

