/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.PartitionPump;
import com.microsoft.azure.eventprocessorhost.PartitionPumpStatus;
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;

class EventHubPartitionPump
extends PartitionPump {
    private CompletableFuture<?> internalOperationFuture = null;
    private EventHubClient eventHubClient = null;
    private PartitionReceiver partitionReceiver = null;
    private InternalReceiveHandler internalReceiveHandler = null;

    EventHubPartitionPump(EventProcessorHost host, Lease lease) {
        super(host, lease);
    }

    @Override
    void specializedStartPump() {
        boolean openedOK = false;
        int retryCount = 0;
        Exception lastException = null;
        do {
            try {
                this.openClients();
                openedOK = true;
            }
            catch (Exception e) {
                lastException = e;
                if (e instanceof ExecutionException && e.getCause() instanceof ReceiverDisconnectedException) {
                    this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Receiver disconnected on create, bad epoch?", (Throwable)e);
                    break;
                }
                this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Failure creating client or receiver, retrying", (Throwable)e);
                ++retryCount;
            }
        } while (!openedOK && retryCount < 5);
        if (!openedOK) {
            this.processor.onError(this.partitionContext, lastException);
            this.pumpStatus = PartitionPumpStatus.PP_OPENFAILED;
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENING) {
            this.internalReceiveHandler = new InternalReceiveHandler();
            this.pumpStatus = PartitionPumpStatus.PP_RUNNING;
            this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler)this.internalReceiveHandler);
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENFAILED) {
            this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
            this.cleanUpClients();
            this.pumpStatus = PartitionPumpStatus.PP_CLOSED;
        }
    }

    private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException {
        this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH client");
        this.internalOperationFuture = EventHubClient.createFromConnectionString((String)this.host.getEventHubConnectionString());
        this.eventHubClient = (EventHubClient)this.internalOperationFuture.get();
        this.internalOperationFuture = null;
        String startingOffset = this.partitionContext.getInitialOffset();
        long epoch = this.lease.getEpoch();
        this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at offset " + startingOffset);
        this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), startingOffset, epoch);
        this.lease.setEpoch(epoch);
        this.partitionReceiver = (PartitionReceiver)this.internalOperationFuture.get();
        this.partitionReceiver.setPrefetchCount(this.host.getEventProcessorOptions().getPrefetchCount());
        this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
        this.internalOperationFuture = null;
        this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "EH client and receiver creation finished");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpClients() {
        if (this.partitionReceiver != null) {
            Object object = this.processingSynchronizer;
            synchronized (object) {
                this.partitionReceiver.setReceiveHandler(null);
            }
            this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH receiver");
            this.partitionReceiver.close();
            this.partitionReceiver = null;
        }
        if (this.eventHubClient != null) {
            this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH client");
            this.eventHubClient.close();
            this.eventHubClient = null;
        }
    }

    @Override
    void specializedShutdown(CloseReason reason) {
        CompletableFuture<?> captured = this.internalOperationFuture;
        if (captured != null) {
            captured.cancel(true);
        }
        if (this.partitionReceiver != null) {
            this.partitionReceiver.setReceiveHandler(null);
            this.cleanUpClients();
        }
    }

    private class InternalReceiveHandler
    extends PartitionReceiveHandler {
        InternalReceiveHandler() {
            super(EventHubPartitionPump.this.host.getEventProcessorOptions().getMaxBatchSize());
        }

        public void onReceive(Iterable<EventData> events) {
            EventHubPartitionPump.this.onEvents(events);
        }

        public void onError(Throwable error) {
            if (error == null) {
                error = new Throwable("No error info supplied by EventHub client");
            }
            if (error instanceof ReceiverDisconnectedException) {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.WARNING, EventHubPartitionPump.this.partitionContext, "EventHub client disconnected, probably another host took the partition");
            } else {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error: " + error.toString());
                if (error instanceof Exception) {
                    EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error continued", (Throwable)((Exception)error));
                }
                EventHubPartitionPump.this.onError(error);
            }
            EventHubPartitionPump.this.pumpStatus = PartitionPumpStatus.PP_ERRORED;
        }
    }
}

