/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.eventhubs;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.eventhubs.EventHubsCheckpointUpdaterTimerTask;
import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
import org.apache.camel.component.azure.eventhubs.EventHubsEndpoint;
import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventHubsConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class);
    private EventProcessorClient processorClient;
    private final AtomicInteger processedEvents = new AtomicInteger();
    private final Timer timer = new Timer();
    private EventHubsCheckpointUpdaterTimerTask lastTask;

    public EventHubsConsumer(EventHubsEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.processorClient = EventHubsClientFactory.createEventProcessorClient(this.getConfiguration(), this::onEventListener, this::onErrorListener);
        this.processorClient.start();
    }

    protected void doStop() throws Exception {
        if (this.processorClient != null) {
            this.processorClient.stop();
            this.processorClient = null;
        }
        super.doStop();
    }

    public EventHubsConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    public EventHubsEndpoint getEndpoint() {
        return (EventHubsEndpoint)super.getEndpoint();
    }

    private Exchange createAzureEventHubExchange(EventContext eventContext) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setBody((Object)eventContext.getEventData().getBody());
        message.setHeader("CamelAzureEventHubsPartitionId", (Object)eventContext.getPartitionContext().getPartitionId());
        message.setHeader("CamelAzureEventHubsPartitionKey", (Object)eventContext.getEventData().getPartitionKey());
        message.setHeader("CamelAzureEventHubsOffset", (Object)eventContext.getEventData().getOffset());
        message.setHeader("CamelAzureEventHubsEnqueuedTime", (Object)eventContext.getEventData().getEnqueuedTime());
        message.setHeader("CamelAzureEventHubsSequenceNumber", (Object)eventContext.getEventData().getSequenceNumber());
        if (eventContext.getEventData().getEnqueuedTime() != null) {
            long ts = eventContext.getEventData().getEnqueuedTime().getEpochSecond() * 1000L;
            message.setHeader("CamelMessageTimestamp", (Object)ts);
        }
        message.setHeader("CamelAzureEventHubsMetadata", (Object)eventContext.getEventData().getProperties());
        return exchange;
    }

    private Exchange createAzureEventHubExchange(ErrorContext errorContext) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setHeader("CamelAzureEventHubsPartitionId", (Object)errorContext.getPartitionContext().getPartitionId());
        exchange.setException(errorContext.getThrowable());
        return exchange;
    }

    private void onEventListener(final EventContext eventContext) {
        Exchange exchange = this.createAzureEventHubExchange(eventContext);
        exchange.getExchangeExtension().addOnCompletion(new Synchronization(){

            public void onComplete(Exchange exchange) {
                EventHubsConsumer.this.processCommit(exchange, eventContext);
            }

            public void onFailure(Exchange exchange) {
                EventHubsConsumer.this.processRollback(exchange);
            }
        });
        AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
        this.getAsyncProcessor().process(exchange, cb);
    }

    private void onErrorListener(ErrorContext errorContext) {
        Exchange exchange = this.createAzureEventHubExchange(errorContext);
        if (exchange.getException() != null) {
            this.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)exchange.getException());
        }
    }

    private void processCommit(Exchange exchange, EventContext eventContext) {
        if (this.lastTask == null || System.currentTimeMillis() > this.lastTask.scheduledExecutionTime()) {
            this.lastTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, this.processedEvents);
            this.timer.schedule((TimerTask)this.lastTask, this.getConfiguration().getCheckpointBatchTimeout());
        } else {
            this.lastTask.setEventContext(eventContext);
        }
        try {
            int cnt = this.processedEvents.incrementAndGet();
            if (cnt == this.getConfiguration().getCheckpointBatchSize()) {
                this.processedEvents.set(0);
                exchange.getIn().setHeader("CamelAzureEventHubsCheckpointUpdatedBy", (Object)"size");
                LOG.debug("eventhub consumer batch size of reached");
                if (this.lastTask != null) {
                    this.lastTask.cancel();
                }
                eventContext.updateCheckpointAsync().subscribe(unused -> LOG.debug("Processed one event..."), error -> LOG.debug("Error when updating Checkpoint: {}", (Object)error.getMessage()), () -> LOG.debug("Checkpoint updated."));
            } else if (System.currentTimeMillis() >= this.lastTask.scheduledExecutionTime()) {
                exchange.getIn().setHeader("CamelAzureEventHubsCheckpointUpdatedBy", (Object)"timeout");
                LOG.debug("eventhub consumer batch timeout reached");
            } else {
                LOG.debug("neither eventhub consumer batch size of {}/{} nor batch timeout reached yet", (Object)cnt, (Object)this.getConfiguration().getCheckpointBatchSize());
            }
        }
        catch (Exception ex) {
            this.getExceptionHandler().handleException("Error occurred during updating the checkpoint. This exception is ignored.", exchange, (Throwable)ex);
        }
    }

    private void processRollback(Exchange exchange) {
        Exception cause = exchange.getException();
        if (cause != null) {
            this.getExceptionHandler().handleException("Error during processing exchange.", exchange, (Throwable)cause);
        }
    }
}

