/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.LoggingUtils;
import com.microsoft.azure.eventprocessorhost.PartitionPump;
import com.microsoft.azure.eventprocessorhost.PartitionPumpStatus;
import com.microsoft.azure.eventprocessorhost.Pump;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EventHubPartitionPump
extends PartitionPump {
    private CompletableFuture<?> internalOperationFuture = null;
    private EventHubClient eventHubClient = null;
    private PartitionReceiver partitionReceiver = null;
    private InternalReceiveHandler internalReceiveHandler = null;
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventHubPartitionPump.class);

    EventHubPartitionPump(EventProcessorHost host, Pump pump, Lease lease) {
        super(host, pump, lease);
    }

    @Override
    void specializedStartPump() {
        boolean openedOK = false;
        int retryCount = 0;
        Exception lastException = null;
        do {
            try {
                this.openClients();
                openedOK = true;
            }
            catch (Exception e) {
                lastException = e;
                if (e instanceof ExecutionException && e.getCause() instanceof ReceiverDisconnectedException) {
                    TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Receiver disconnected on create, bad epoch?"), (Throwable)e);
                    break;
                }
                TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Failure creating client or receiver, retrying"), (Throwable)e);
                ++retryCount;
            }
        } while (!openedOK && retryCount < 5);
        if (!openedOK) {
            this.processor.onError(this.partitionContext, lastException);
            this.pumpStatus = PartitionPumpStatus.PP_OPENFAILED;
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENING) {
            this.internalReceiveHandler = new InternalReceiveHandler();
            this.pumpStatus = PartitionPumpStatus.PP_RUNNING;
            this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler)this.internalReceiveHandler, this.host.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout().booleanValue());
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENFAILED) {
            this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
            this.cleanUpClients();
            this.pumpStatus = PartitionPumpStatus.PP_CLOSED;
        }
    }

    private void openClients() throws EventHubException, IOException, InterruptedException, ExecutionException {
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Opening EH client"));
        this.internalOperationFuture = EventHubClient.createFromConnectionString((String)this.host.getEventHubConnectionString());
        this.eventHubClient = (EventHubClient)this.internalOperationFuture.get();
        this.internalOperationFuture = null;
        ReceiverOptions options = new ReceiverOptions();
        options.setReceiverRuntimeMetricEnabled(this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
        Object startAt = this.partitionContext.getInitialOffset();
        long epoch = this.lease.getEpoch();
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt));
        if (startAt instanceof String) {
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (String)startAt, epoch, options);
        } else if (startAt instanceof Instant) {
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (Instant)startAt, epoch, options);
        } else {
            String errMsg = "Starting offset is not String or Instant, is " + (startAt != null ? startAt.getClass().toString() : "null");
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, errMsg));
            throw new RuntimeException(errMsg);
        }
        this.lease.setEpoch(epoch);
        if (this.internalOperationFuture == null) {
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "createEpochReceiver failed with null"));
            throw new RuntimeException("createEpochReceiver failed with null");
        }
        this.partitionReceiver = (PartitionReceiver)this.internalOperationFuture.get();
        this.partitionReceiver.setPrefetchCount(this.host.getEventProcessorOptions().getPrefetchCount());
        this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
        this.internalOperationFuture = null;
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "EH client and receiver creation finished"));
    }

    private void cleanUpClients() {
        if (this.partitionReceiver != null) {
            block11: {
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Setting receive handler to null"));
                try {
                    this.partitionReceiver.setReceiveHandler(null).get();
                }
                catch (InterruptedException | ExecutionException exception) {
                    Throwable throwable;
                    if (exception instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    if ((throwable = exception.getCause()) == null) break block11;
                    TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Got exception from onEvents when ReceiveHandler is set to null."), throwable);
                }
            }
            TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Closing EH receiver"));
            PartitionReceiver partitionReceiverTemp = this.partitionReceiver;
            this.partitionReceiver = null;
            try {
                partitionReceiverTemp.closeSync();
            }
            catch (EventHubException exception) {
                TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Closing EH receiver failed."), (Throwable)exception);
            }
        } else {
            TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "partitionReceiver is null in cleanup"));
        }
        if (this.eventHubClient != null) {
            TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Closing EH client"));
            EventHubClient eventHubClientTemp = this.eventHubClient;
            this.eventHubClient = null;
            try {
                eventHubClientTemp.closeSync();
            }
            catch (EventHubException exception) {
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "Closing EH client failed."), (Throwable)exception);
            }
        } else {
            TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), this.partitionContext, "eventHubClient is null in cleanup"));
        }
    }

    @Override
    void specializedShutdown(CloseReason reason) {
        CompletableFuture<?> captured = this.internalOperationFuture;
        if (captured != null) {
            captured.cancel(true);
        }
        if (this.partitionReceiver != null) {
            this.cleanUpClients();
        }
    }

    private class InternalReceiveHandler
    extends PartitionReceiveHandler {
        InternalReceiveHandler() {
            super(EventHubPartitionPump.this.host.getEventProcessorOptions().getMaxBatchSize());
        }

        public void onReceive(Iterable<EventData> events) {
            Iterable<EventData> effectiveEvents;
            if (EventHubPartitionPump.this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()) {
                EventHubPartitionPump.this.partitionContext.setRuntimeInformation(EventHubPartitionPump.this.partitionReceiver.getRuntimeInformation());
            }
            if ((effectiveEvents = events) == null) {
                effectiveEvents = new ArrayList<EventData>();
            }
            EventHubPartitionPump.this.onEvents(effectiveEvents);
        }

        public void onError(Throwable error) {
            EventHubPartitionPump.this.pumpStatus = PartitionPumpStatus.PP_ERRORED;
            if (error == null) {
                error = new Throwable("No error info supplied by EventHub client");
            }
            if (error instanceof ReceiverDisconnectedException) {
                TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(EventHubPartitionPump.this.host.getHostName(), EventHubPartitionPump.this.partitionContext, "EventHub client disconnected, probably another host took the partition"));
            } else {
                TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(EventHubPartitionPump.this.host.getHostName(), EventHubPartitionPump.this.partitionContext, "EventHub client error: " + error.toString()));
                if (error instanceof Exception) {
                    TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(EventHubPartitionPump.this.host.getHostName(), EventHubPartitionPump.this.partitionContext, "EventHub client error continued"), (Throwable)((Exception)error));
                }
            }
            Throwable capturedError = error;
            EventHubPartitionPump.this.host.getExecutorService().submit(() -> EventHubPartitionPump.this.onError(capturedError));
        }
    }
}

