/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.azure.eventhubs.internal.source;

import com.azure.messaging.eventhubs.models.EventPosition;
import com.mulesoft.connectors.azure.eventhubs.api.EventAttributes;
import com.mulesoft.connectors.azure.eventhubs.api.eheventposition.EventHubEventPositionType;
import com.mulesoft.connectors.azure.eventhubs.internal.client.EventHubConsumerClient;
import com.mulesoft.connectors.azure.eventhubs.internal.connection.AzureEventHubsConnection;
import com.mulesoft.connectors.azure.eventhubs.internal.source.eventhandler.DefaultEventHandler;
import java.io.InputStream;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;

@MediaType(value="*/*", strict=false)
@ClusterSupport(value=SourceClusterSupport.DEFAULT_ALL_NODES)
@Alias(value="eventhub-listener")
@BackPressure(defaultMode=BackPressureMode.DROP, supportedModes={BackPressureMode.DROP, BackPressureMode.WAIT})
public class EventHubListener
extends Source<InputStream, EventAttributes> {
    public static final int DEFAULT_CHECKPOINT_FREQUENCY = 1000;
    @Parameter
    @Optional
    @Summary(value="The Consumer Group you want to belong to.")
    private String consumerGroup;
    @Parameter
    @Optional
    @Summary(value="The frequency of updating the checkpoint. For instance, every 1000 events received.")
    private Integer checkpointFrequency;
    @Optional
    @Parameter
    @ParameterDsl(allowReferences=false)
    private EventHubEventPositionType eventHubEventPositionType;
    @Connection
    private ConnectionProvider<AzureEventHubsConnection> connectionProvider;
    private EventHubConsumerClient consumer;
    private AzureEventHubsConnection eventHubsConnection;

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public void setCheckpointFrequency(Integer checkpointFrequency) {
        this.checkpointFrequency = checkpointFrequency;
    }

    public EventHubEventPositionType getEventHubEventPositionType() {
        return this.eventHubEventPositionType;
    }

    public void setEventHubEventPositionType(EventHubEventPositionType eventHubEventPositionType) {
        this.eventHubEventPositionType = eventHubEventPositionType;
    }

    public ConnectionProvider<AzureEventHubsConnection> getConnectionProvider() {
        return this.connectionProvider;
    }

    public void setConnectionProvider(ConnectionProvider<AzureEventHubsConnection> connectionProvider) {
        this.connectionProvider = connectionProvider;
    }

    public EventHubConsumerClient getConsumer() {
        return this.consumer;
    }

    public void setConsumer(EventHubConsumerClient consumer) {
        this.consumer = consumer;
    }

    public AzureEventHubsConnection getEventHubsConnection() {
        return this.eventHubsConnection;
    }

    public void setEventHubsConnection(AzureEventHubsConnection eventHubsConnection) {
        this.eventHubsConnection = eventHubsConnection;
    }

    public void onStart(SourceCallback<InputStream, EventAttributes> sourceCallback) throws ConnectionException {
        this.eventHubsConnection = (AzureEventHubsConnection)this.connectionProvider.connect();
        DefaultEventHandler eventHandler = new DefaultEventHandler(sourceCallback);
        EventPosition eventPosition = this.eventHubEventPositionType != null ? this.eventHubEventPositionType.resolveEventPosition() : EventPosition.latest();
        this.consumer = this.eventHubsConnection.getEventHubConsumer(this.consumerGroup, eventPosition, this.getCheckpointFrequency(), eventHandler);
        this.consumer.consume();
    }

    public void onStop() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.eventHubsConnection != null) {
            this.connectionProvider.disconnect((Object)this.eventHubsConnection);
        }
    }

    public Integer getCheckpointFrequency() {
        return this.checkpointFrequency == null ? 1000 : this.checkpointFrequency;
    }
}

