/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhubs.inbound;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.listener.MessageListener;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.implementation.checkpoint.CheckpointManagers;
import com.azure.spring.messaging.eventhubs.implementation.checkpoint.EventCheckpointManager;
import com.azure.spring.messaging.eventhubs.implementation.core.listener.adapter.BatchMessagingMessageListenerAdapter;
import com.azure.spring.messaging.eventhubs.implementation.core.listener.adapter.RecordMessagingMessageListenerAdapter;
import com.azure.spring.messaging.implementation.checkpoint.AzureCheckpointer;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

public class EventHubsInboundChannelAdapter
extends MessageProducerSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsInboundChannelAdapter.class);
    private final EventHubsMessageListenerContainer listenerContainer;
    private final ListenerMode listenerMode;
    private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
    private final IntegrationBatchMessageListener batchListener = new IntegrationBatchMessageListener();
    private final CheckpointConfig checkpointConfig;
    private EventCheckpointManager checkpointManager;
    private InstrumentationManager instrumentationManager;
    private String instrumentationId;

    public EventHubsInboundChannelAdapter(EventHubsMessageListenerContainer listenerContainer) {
        this(listenerContainer, ListenerMode.RECORD);
    }

    public EventHubsInboundChannelAdapter(EventHubsMessageListenerContainer listenerContainer, ListenerMode listenerMode) {
        this.listenerContainer = listenerContainer;
        this.listenerMode = listenerMode;
        CheckpointConfig containerCheckpointConfig = listenerContainer.getContainerProperties().getCheckpointConfig();
        this.checkpointConfig = containerCheckpointConfig == null ? new CheckpointConfig() : containerCheckpointConfig;
    }

    protected void onInit() {
        Object listener = ListenerMode.BATCH == this.listenerMode ? this.batchListener : this.recordListener;
        this.checkpointManager = CheckpointManagers.of((CheckpointConfig)this.checkpointConfig, (ListenerMode)this.listenerMode);
        this.listenerContainer.setupMessageListener((MessageListener)listener);
        this.listenerContainer.setErrorHandler((EventHubsErrorHandler)new IntegrationErrorHandler());
        this.enhanceListenerContainer();
    }

    public void doStart() {
        this.listenerContainer.start();
    }

    protected void doStop() {
        this.listenerContainer.stop();
    }

    public void setMessageConverter(AzureMessageConverter<EventData, EventData> messageConverter) {
        this.recordListener.setMessageConverter(messageConverter);
    }

    public void setBatchMessageConverter(AzureMessageConverter<EventBatchContext, EventData> messageConverter) {
        this.batchListener.setMessageConverter(messageConverter);
    }

    public void setPayloadType(Class<?> payloadType) {
        if (ListenerMode.BATCH == this.listenerMode) {
            this.batchListener.setPayloadType(payloadType);
        } else {
            this.recordListener.setPayloadType(payloadType);
        }
    }

    public void setInstrumentationManager(InstrumentationManager instrumentationManager) {
        this.instrumentationManager = instrumentationManager;
    }

    public void setInstrumentationId(String instrumentationId) {
        this.instrumentationId = instrumentationId;
    }

    private void enhanceListenerContainer() {
        this.listenerContainer.getContainerProperties().setCloseContextConsumer(closeContext -> LOGGER.info("Stopped receiving on partition: {}. Reason: {}", (Object)closeContext.getPartitionContext().getPartitionId(), (Object)closeContext.getCloseReason()));
        this.listenerContainer.getContainerProperties().setInitializationContextConsumer(initializationContext -> LOGGER.info("Started receiving on partition: {}", (Object)initializationContext.getPartitionContext().getPartitionId()));
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter {
        private IntegrationRecordMessageListener() {
        }

        public void onMessage(EventContext eventContext) {
            PartitionContext partition = eventContext.getPartitionContext();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("azure_raw_partition_id", partition.getPartitionId());
            headers.put("azure_eventhubs_last_enqueued_event_properties", (String)eventContext.getLastEnqueuedEventProperties());
            EventData event = eventContext.getEventData();
            if (CheckpointMode.MANUAL == EventHubsInboundChannelAdapter.this.checkpointConfig.getMode()) {
                AzureCheckpointer checkpointer = new AzureCheckpointer(() -> ((EventContext)eventContext).updateCheckpointAsync());
                headers.put("azure_checkpointer", (String)checkpointer);
            }
            Message message = this.getMessageConverter().toMessage((Object)event, (Map)new MessageHeaders(headers), this.payloadType);
            EventHubsInboundChannelAdapter.this.sendMessage(message);
            EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventContext);
        }
    }

    private class IntegrationBatchMessageListener
    extends BatchMessagingMessageListenerAdapter {
        private IntegrationBatchMessageListener() {
        }

        public void onMessage(EventBatchContext eventBatchContext) {
            PartitionContext partition = eventBatchContext.getPartitionContext();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("azure_raw_partition_id", partition.getPartitionId());
            headers.put("azure_eventhubs_last_enqueued_event_properties", (String)eventBatchContext.getLastEnqueuedEventProperties());
            if (CheckpointMode.MANUAL == EventHubsInboundChannelAdapter.this.checkpointConfig.getMode()) {
                AzureCheckpointer checkpointer = new AzureCheckpointer(() -> ((EventBatchContext)eventBatchContext).updateCheckpointAsync());
                headers.put("azure_checkpointer", (String)checkpointer);
            }
            Message message = this.getMessageConverter().toMessage((Object)eventBatchContext, (Map)new MessageHeaders(headers), this.payloadType);
            EventHubsInboundChannelAdapter.this.sendMessage(message);
            if (EventHubsInboundChannelAdapter.this.checkpointConfig.getMode() == CheckpointMode.BATCH) {
                EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventBatchContext);
            }
        }
    }

    private class IntegrationErrorHandler
    implements EventHubsErrorHandler {
        private IntegrationErrorHandler() {
        }

        public void accept(ErrorContext errorContext) {
            LOGGER.error("Error occurred on partition: {}. Error: {}", (Object)errorContext.getPartitionContext().getPartitionId(), (Object)errorContext.getThrowable());
            this.updateInstrumentation(errorContext);
        }

        private void updateInstrumentation(ErrorContext errorContext) {
            if (EventHubsInboundChannelAdapter.this.instrumentationManager == null) {
                return;
            }
            Instrumentation instrumentation = EventHubsInboundChannelAdapter.this.instrumentationManager.getHealthInstrumentation(EventHubsInboundChannelAdapter.this.instrumentationId);
            if (instrumentation != null) {
                if (instrumentation instanceof EventHubsProcessorInstrumentation) {
                    ((EventHubsProcessorInstrumentation)instrumentation).markError(errorContext);
                } else {
                    instrumentation.setStatus(Instrumentation.Status.DOWN, errorContext.getThrowable());
                }
            }
        }
    }
}

