/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.PartitionPump;
import com.microsoft.azure.eventprocessorhost.PartitionPumpStatus;
import com.microsoft.azure.eventprocessorhost.Pump;
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;

class EventHubPartitionPump
extends PartitionPump {
    private CompletableFuture<?> internalOperationFuture = null;
    private EventHubClient eventHubClient = null;
    private PartitionReceiver partitionReceiver = null;
    private InternalReceiveHandler internalReceiveHandler = null;

    EventHubPartitionPump(EventProcessorHost host, Pump pump, Lease lease) {
        super(host, pump, lease);
    }

    @Override
    void specializedStartPump() {
        boolean openedOK = false;
        int retryCount = 0;
        Exception lastException = null;
        do {
            try {
                this.openClients();
                openedOK = true;
            }
            catch (Exception e) {
                lastException = e;
                if (e instanceof ExecutionException && e.getCause() instanceof ReceiverDisconnectedException) {
                    this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Receiver disconnected on create, bad epoch?", (Throwable)e);
                    break;
                }
                this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Failure creating client or receiver, retrying", (Throwable)e);
                ++retryCount;
            }
        } while (!openedOK && retryCount < 5);
        if (!openedOK) {
            this.processor.onError(this.partitionContext, lastException);
            this.pumpStatus = PartitionPumpStatus.PP_OPENFAILED;
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENING) {
            this.internalReceiveHandler = new InternalReceiveHandler();
            this.pumpStatus = PartitionPumpStatus.PP_RUNNING;
            this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler)this.internalReceiveHandler, this.host.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout().booleanValue());
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENFAILED) {
            this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
            this.cleanUpClients();
            this.pumpStatus = PartitionPumpStatus.PP_CLOSED;
        }
    }

    private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException {
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH client");
        this.internalOperationFuture = EventHubClient.createFromConnectionString((String)this.host.getEventHubConnectionString());
        this.eventHubClient = (EventHubClient)this.internalOperationFuture.get();
        this.internalOperationFuture = null;
        ReceiverOptions options = new ReceiverOptions();
        options.setReceiverRuntimeMetricEnabled(this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
        Object startAt = this.partitionContext.getInitialOffset();
        long epoch = this.lease.getEpoch();
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
        if (startAt instanceof String) {
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (String)startAt, epoch, options);
        } else if (startAt instanceof Instant) {
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (Instant)startAt, epoch, options);
        } else {
            String errMsg = "Starting offset is not String or Instant, is " + (startAt != null ? startAt.getClass().toString() : "null");
            this.host.logWithHostAndPartition(Level.SEVERE, this.partitionContext, errMsg);
            throw new RuntimeException(errMsg);
        }
        this.lease.setEpoch(epoch);
        if (this.internalOperationFuture == null) {
            this.host.logWithHostAndPartition(Level.SEVERE, this.partitionContext, "createEpochReceiver failed with null");
            throw new RuntimeException("createEpochReceiver failed with null");
        }
        this.partitionReceiver = (PartitionReceiver)this.internalOperationFuture.get();
        this.partitionReceiver.setPrefetchCount(this.host.getEventProcessorOptions().getPrefetchCount());
        this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
        this.internalOperationFuture = null;
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "EH client and receiver creation finished");
    }

    private void cleanUpClients() {
        if (this.partitionReceiver != null) {
            block11: {
                this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Setting receive handler to null");
                try {
                    this.partitionReceiver.setReceiveHandler(null).get();
                }
                catch (InterruptedException | ExecutionException exception) {
                    Throwable throwable;
                    if (exception instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    if ((throwable = exception.getCause()) == null) break block11;
                    this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Got exception from onEvents when ReceiveHandler is set to null.", throwable);
                }
            }
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH receiver");
            PartitionReceiver partitionReceiverTemp = this.partitionReceiver;
            this.partitionReceiver = null;
            try {
                partitionReceiverTemp.closeSync();
            }
            catch (ServiceBusException exception) {
                this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH receiver failed.", (Throwable)exception);
            }
        } else {
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "partitionReceiver is null in cleanup");
        }
        if (this.eventHubClient != null) {
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH client");
            EventHubClient eventHubClientTemp = this.eventHubClient;
            this.eventHubClient = null;
            try {
                eventHubClientTemp.closeSync();
            }
            catch (ServiceBusException exception) {
                this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH client failed.", (Throwable)exception);
            }
        } else {
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "eventHubClient is null in cleanup");
        }
    }

    @Override
    void specializedShutdown(CloseReason reason) {
        CompletableFuture<?> captured = this.internalOperationFuture;
        if (captured != null) {
            captured.cancel(true);
        }
        if (this.partitionReceiver != null) {
            this.cleanUpClients();
        }
    }

    private class InternalReceiveHandler
    extends PartitionReceiveHandler {
        InternalReceiveHandler() {
            super(EventHubPartitionPump.this.host.getEventProcessorOptions().getMaxBatchSize());
        }

        public void onReceive(Iterable<EventData> events) {
            Iterable<EventData> effectiveEvents;
            if (EventHubPartitionPump.this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()) {
                EventHubPartitionPump.this.partitionContext.setRuntimeInformation(EventHubPartitionPump.this.partitionReceiver.getRuntimeInformation());
            }
            if ((effectiveEvents = events) == null) {
                effectiveEvents = new ArrayList<EventData>();
            }
            EventHubPartitionPump.this.onEvents(effectiveEvents);
        }

        public void onError(Throwable error) {
            EventHubPartitionPump.this.pumpStatus = PartitionPumpStatus.PP_ERRORED;
            if (error == null) {
                error = new Throwable("No error info supplied by EventHub client");
            }
            if (error instanceof ReceiverDisconnectedException) {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.WARNING, EventHubPartitionPump.this.partitionContext, "EventHub client disconnected, probably another host took the partition");
            } else {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error: " + error.toString());
                if (error instanceof Exception) {
                    EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error continued", (Throwable)((Exception)error));
                }
            }
            Throwable capturedError = error;
            EventProcessorHost.getExecutorService().submit(() -> EventHubPartitionPump.this.onError(capturedError));
        }
    }
}

