/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhubs.inbound;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.spring.eventhubs.checkpoint.CheckpointManagers;
import com.azure.spring.eventhubs.checkpoint.EventCheckpointManager;
import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.eventhubs.support.converter.EventHubsBatchMessageConverter;
import com.azure.spring.eventhubs.support.converter.EventHubsMessageConverter;
import com.azure.spring.integration.eventhubs.inbound.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.instrumentation.Instrumentation;
import com.azure.spring.integration.instrumentation.InstrumentationManager;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.service.eventhubs.processor.BatchEventProcessingListener;
import com.azure.spring.service.eventhubs.processor.EventProcessingListener;
import com.azure.spring.service.eventhubs.processor.RecordEventProcessingListener;
import com.azure.spring.service.eventhubs.processor.consumer.EventHubsCloseContextConsumer;
import com.azure.spring.service.eventhubs.processor.consumer.EventHubsErrorContextConsumer;
import com.azure.spring.service.eventhubs.processor.consumer.EventHubsInitializationContextConsumer;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

public class EventHubsInboundChannelAdapter
extends MessageProducerSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsInboundChannelAdapter.class);
    private final EventHubsProcessorContainer processorContainer;
    private final String eventHubName;
    private final String consumerGroup;
    private final ListenerMode listenerMode;
    private final IntegrationRecordEventProcessingListener recordEventProcessor = new IntegrationRecordEventProcessingListener();
    private final IntegrationBatchEventProcessingListener batchEventProcessor = new IntegrationBatchEventProcessingListener();
    private final CheckpointConfig checkpointConfig;
    private InstrumentationEventProcessingListener listener;
    private EventCheckpointManager checkpointManager;
    private Class<?> payloadType;

    public EventHubsInboundChannelAdapter(EventHubsProcessorContainer processorContainer, String eventHubName, String consumerGroup, CheckpointConfig checkpointConfig) {
        this(processorContainer, eventHubName, consumerGroup, ListenerMode.RECORD, checkpointConfig);
    }

    public EventHubsInboundChannelAdapter(EventHubsProcessorContainer eventProcessorsContainer, String eventHubName, String consumerGroup, ListenerMode listenerMode, CheckpointConfig checkpointConfig) {
        Assert.notNull((Object)eventHubName, (String)"eventhubName must be provided");
        Assert.notNull((Object)consumerGroup, (String)"consumerGroup must be provided");
        this.processorContainer = eventProcessorsContainer;
        this.eventHubName = eventHubName;
        this.consumerGroup = consumerGroup;
        this.listenerMode = listenerMode;
        this.checkpointConfig = checkpointConfig;
    }

    protected void onInit() {
        this.listener = ListenerMode.BATCH.equals((Object)this.listenerMode) ? this.batchEventProcessor : this.recordEventProcessor;
        if (this.payloadType != null) {
            this.listener.setPayloadType(this.payloadType);
        }
        this.checkpointManager = CheckpointManagers.of((CheckpointConfig)this.checkpointConfig, (ListenerMode)this.listenerMode);
        this.processorContainer.subscribe(this.eventHubName, this.consumerGroup, (EventProcessingListener)this.listener);
    }

    public void doStart() {
        this.processorContainer.start();
    }

    protected void doStop() {
        this.processorContainer.stop();
    }

    public void setMessageConverter(AzureMessageConverter<EventData, EventData> messageConverter) {
        this.recordEventProcessor.setMessageConverter(messageConverter);
    }

    public void setBatchMessageConverter(AzureMessageConverter<EventBatchContext, EventData> messageConverter) {
        this.batchEventProcessor.setMessageConverter(messageConverter);
    }

    public void setPayloadType(Class<?> payloadType) {
        this.payloadType = payloadType;
    }

    public void setInstrumentationManager(InstrumentationManager instrumentationManager) {
        if (ListenerMode.BATCH.equals((Object)this.listenerMode)) {
            this.batchEventProcessor.setInstrumentationManager(instrumentationManager);
        } else {
            this.recordEventProcessor.setInstrumentationManager(instrumentationManager);
        }
    }

    public void setInstrumentationId(String instrumentationId) {
        if (ListenerMode.BATCH.equals((Object)this.listenerMode)) {
            this.batchEventProcessor.setInstrumentationId(instrumentationId);
        } else {
            this.recordEventProcessor.setInstrumentationId(instrumentationId);
        }
    }

    private class IntegrationBatchEventProcessingListener
    implements InstrumentationEventProcessingListener,
    BatchEventProcessingListener {
        private AzureMessageConverter<EventBatchContext, EventData> messageConverter = new EventHubsBatchMessageConverter();
        private Class<?> payloadType = byte[].class;
        private InstrumentationManager instrumentationManager;
        private String instrumentationId;

        private IntegrationBatchEventProcessingListener() {
        }

        public EventHubsErrorContextConsumer getErrorContextConsumer() {
            return errorContext -> {
                LOGGER.error("Error occurred on partition: {}. Error: {}", (Object)errorContext.getPartitionContext().getPartitionId(), (Object)errorContext.getThrowable());
                this.updateInstrumentation((ErrorContext)errorContext, this.instrumentationManager, this.instrumentationId);
            };
        }

        public EventHubsCloseContextConsumer getCloseContextConsumer() {
            return closeContext -> LOGGER.info("Stopped receiving on partition: {}. Reason: {}", (Object)closeContext.getPartitionContext().getPartitionId(), (Object)closeContext.getCloseReason());
        }

        public EventHubsInitializationContextConsumer getInitializationContextConsumer() {
            return initializationContext -> LOGGER.info("Started receiving on partition: {}", (Object)initializationContext.getPartitionContext().getPartitionId());
        }

        public void setMessageConverter(AzureMessageConverter<EventBatchContext, EventData> converter) {
            this.messageConverter = converter;
        }

        @Override
        public void setPayloadType(Class<?> payloadType) {
            this.payloadType = payloadType;
        }

        public void onEventBatch(EventBatchContext eventBatchContext) {
            PartitionContext partition = eventBatchContext.getPartitionContext();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("azure_raw_partition_id", partition.getPartitionId());
            headers.put("azure_eventhub_last_enqueued_event_properties", (String)eventBatchContext.getLastEnqueuedEventProperties());
            AzureCheckpointer checkpointer = new AzureCheckpointer(() -> ((EventBatchContext)eventBatchContext).updateCheckpointAsync());
            if (CheckpointMode.MANUAL.equals((Object)EventHubsInboundChannelAdapter.this.checkpointConfig.getMode())) {
                headers.put("azure_checkpointer", (String)checkpointer);
            }
            Message message = this.messageConverter.toMessage((Object)eventBatchContext, (Map)new MessageHeaders(headers), this.payloadType);
            EventHubsInboundChannelAdapter.this.sendMessage(message);
            if (EventHubsInboundChannelAdapter.this.checkpointConfig.getMode().equals((Object)CheckpointMode.BATCH)) {
                EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventBatchContext);
            }
        }

        @Override
        public void setInstrumentationManager(InstrumentationManager instrumentationManager) {
            this.instrumentationManager = instrumentationManager;
        }

        @Override
        public void setInstrumentationId(String instrumentationId) {
            this.instrumentationId = instrumentationId;
        }
    }

    private class IntegrationRecordEventProcessingListener
    implements InstrumentationEventProcessingListener,
    RecordEventProcessingListener {
        private AzureMessageConverter<EventData, EventData> messageConverter = new EventHubsMessageConverter();
        private Class<?> payloadType = byte[].class;
        private InstrumentationManager instrumentationManager;
        private String instrumentationId;

        private IntegrationRecordEventProcessingListener() {
        }

        public EventHubsErrorContextConsumer getErrorContextConsumer() {
            return errorContext -> {
                LOGGER.error("Record event error occurred on partition: {}. Error: {}", (Object)errorContext.getPartitionContext().getPartitionId(), (Object)errorContext.getThrowable());
                this.updateInstrumentation((ErrorContext)errorContext, this.instrumentationManager, this.instrumentationId);
            };
        }

        public void onEvent(EventContext eventContext) {
            PartitionContext partition = eventContext.getPartitionContext();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("azure_raw_partition_id", partition.getPartitionId());
            headers.put("azure_eventhub_last_enqueued_event_properties", (String)eventContext.getLastEnqueuedEventProperties());
            EventData event = eventContext.getEventData();
            AzureCheckpointer checkpointer = new AzureCheckpointer(() -> ((EventContext)eventContext).updateCheckpointAsync());
            if (CheckpointMode.MANUAL.equals((Object)EventHubsInboundChannelAdapter.this.checkpointConfig.getMode())) {
                headers.put("azure_checkpointer", (String)checkpointer);
            }
            Message message = this.messageConverter.toMessage((Object)event, (Map)new MessageHeaders(headers), this.payloadType);
            EventHubsInboundChannelAdapter.this.sendMessage(message);
            EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventContext);
        }

        public EventHubsCloseContextConsumer getCloseContextConsumer() {
            return closeContext -> LOGGER.info("Stopped receiving on partition: {}. Reason: {}", (Object)closeContext.getPartitionContext().getPartitionId(), (Object)closeContext.getCloseReason());
        }

        public EventHubsInitializationContextConsumer getInitializationContextConsumer() {
            return initializationContext -> LOGGER.info("Started receiving on partition: {}", (Object)initializationContext.getPartitionContext().getPartitionId());
        }

        public void setMessageConverter(AzureMessageConverter<EventData, EventData> converter) {
            this.messageConverter = converter;
        }

        @Override
        public void setPayloadType(Class<?> payloadType) {
            this.payloadType = payloadType;
        }

        @Override
        public void setInstrumentationManager(InstrumentationManager instrumentationManager) {
            this.instrumentationManager = instrumentationManager;
        }

        @Override
        public void setInstrumentationId(String instrumentationId) {
            this.instrumentationId = instrumentationId;
        }
    }

    private static interface InstrumentationEventProcessingListener
    extends EventProcessingListener {
        public void setInstrumentationManager(InstrumentationManager var1);

        public void setInstrumentationId(String var1);

        public void setPayloadType(Class<?> var1);

        default public void updateInstrumentation(ErrorContext errorContext, InstrumentationManager instrumentationManager, String instrumentationId) {
            if (instrumentationManager == null) {
                return;
            }
            Instrumentation instrumentation = instrumentationManager.getHealthInstrumentation(instrumentationId);
            if (instrumentation != null) {
                if (instrumentation instanceof EventHubsProcessorInstrumentation) {
                    ((EventHubsProcessorInstrumentation)instrumentation).markError(errorContext);
                } else {
                    instrumentation.markDown(errorContext.getThrowable());
                }
            }
        }
    }
}

