/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.ExceptionWithAction;
import com.microsoft.azure.eventprocessorhost.ICheckpointManager;
import com.microsoft.azure.eventprocessorhost.ILeaseManager;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.Pump;
import com.microsoft.azure.servicebus.IllegalEntityException;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.TimeoutException;
import com.microsoft.azure.storage.StorageException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;

class PartitionManager {
    protected final EventProcessorHost host;
    protected Pump pump;
    private String[] partitionIds = null;
    private Future<?> partitionsFuture = null;
    private boolean keepGoing = true;

    PartitionManager(EventProcessorHost host) {
        this.host = host;
    }

    String[] getPartitionIds() throws IllegalEntityException {
        Throwable saved = null;
        if (this.partitionIds == null) {
            try {
                EventHubClient ehClient = EventHubClient.createFromConnectionStringSync((String)this.host.getEventHubConnectionString());
                EventHubRuntimeInformation ehInfo = (EventHubRuntimeInformation)ehClient.getRuntimeInformation().get();
                if (ehInfo != null) {
                    this.partitionIds = ehInfo.getPartitionIds();
                    this.host.logWithHost(Level.FINE, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + ehInfo.getPartitionCount());
                    for (String id : this.partitionIds) {
                        this.host.logWithHost(Level.FINER, "Found partition with id: " + id);
                    }
                } else {
                    saved = new TimeoutException("getRuntimeInformation returned null");
                }
            }
            catch (ServiceBusException | IOException | InterruptedException | ExecutionException e) {
                saved = e;
            }
        }
        if (this.partitionIds == null) {
            throw new IllegalEntityException("Failure getting partition ids for event hub", saved);
        }
        return this.partitionIds;
    }

    Pump createPumpTestHook() {
        return new Pump(this.host);
    }

    void onInitializeCompleteTestHook() {
    }

    void onPartitionCheckCompleteTestHook() {
    }

    Future<?> stopPartitions() {
        this.keepGoing = false;
        return this.partitionsFuture;
    }

    public Void initialize() throws Exception {
        this.pump = this.createPumpTestHook();
        try {
            this.initializeStores();
            this.onInitializeCompleteTestHook();
        }
        catch (ExceptionWithAction e) {
            this.host.logWithHost(Level.SEVERE, "Exception while initializing stores (" + e.getAction() + "), not starting partition manager", e.getCause());
            throw e;
        }
        catch (Exception e) {
            this.host.logWithHost(Level.SEVERE, "Exception while initializing stores, not starting partition manager", e);
            throw e;
        }
        this.partitionsFuture = EventProcessorHost.getExecutorService().submit(() -> this.runAndCleanUp());
        return null;
    }

    private Void runAndCleanUp() {
        try {
            this.runLoop();
            this.host.logWithHost(Level.FINE, "Partition manager main loop exited normally, shutting down");
        }
        catch (ExceptionWithAction e) {
            this.host.logWithHost(Level.SEVERE, "Exception from partition manager main loop, shutting down", e.getCause());
            this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), e, e.getAction());
        }
        catch (Exception e) {
            if (e instanceof ExecutionException && e.getCause() instanceof OutOfMemoryError) {
                Exception forLogging = new Exception("Got OutOfMemoryError with " + Thread.activeCount() + " threads running");
                this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), forLogging, "Partition Manager Main Loop");
                Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
                for (Map.Entry<Thread, StackTraceElement[]> entry : stacks.entrySet()) {
                    String stackString = "Thread " + entry.getKey().getId() + ":\n";
                    for (int i = 0; i < entry.getValue().length; ++i) {
                        stackString = stackString + entry.getValue()[i].toString() + "\n";
                    }
                    forLogging = new Exception(stackString);
                    this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), forLogging, "Partition Manager Main Loop");
                }
            }
            this.host.logWithHost(Level.SEVERE, "Exception from partition manager main loop, shutting down", e);
            this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), e, "Partition Manager Main Loop");
        }
        this.host.logWithHost(Level.FINE, "Shutting down all pumps");
        Iterable<Future<?>> pumpRemovals = this.pump.removeAllPumps(CloseReason.Shutdown);
        this.host.stopExecutor();
        for (Future<?> removal : pumpRemovals) {
            try {
                removal.get();
            }
            catch (InterruptedException | ExecutionException e) {
                this.host.logWithHost(Level.SEVERE, "Failure during shutdown", e);
                this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), e, "Partition Manager Cleanup");
                if (!(e instanceof InterruptedException)) continue;
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        this.host.logWithHost(Level.FINE, "Partition manager exiting");
        return null;
    }

    private void initializeStores() throws InterruptedException, ExecutionException, ExceptionWithAction, IllegalEntityException {
        ILeaseManager leaseManager = this.host.getLeaseManager();
        if (!leaseManager.leaseStoreExists().get().booleanValue()) {
            this.retryWrapper(() -> leaseManager.createLeaseStoreIfNotExists(), null, "Failure creating lease store for this Event Hub, retrying", "Out of retries creating lease store for this Event Hub", "Creating Lease Store", 5);
        }
        for (String id : this.getPartitionIds()) {
            this.retryWrapper(() -> leaseManager.createLeaseIfNotExists(id), id, "Failure creating lease for partition, retrying", "Out of retries creating lease for partition", "Creating Lease", 5);
        }
        ICheckpointManager checkpointManager = this.host.getCheckpointManager();
        if (!checkpointManager.checkpointStoreExists().get().booleanValue()) {
            this.retryWrapper(() -> checkpointManager.createCheckpointStoreIfNotExists(), null, "Failure creating checkpoint store for this Event Hub, retrying", "Out of retries creating checkpoint store for this Event Hub", "Creating Checkpoint Store", 5);
        }
        for (String id : this.getPartitionIds()) {
            this.retryWrapper(() -> checkpointManager.createCheckpointIfNotExists(id), id, "Failure creating checkpoint for partition, retrying", "Out of retries creating checkpoint blob for partition", "Creating Checkpoint", 5);
        }
    }

    private void retryWrapper(Callable<Future<?>> lambda, String partitionId, String retryMessage, String finalFailureMessage, String action, int maxRetries) throws ExceptionWithAction {
        boolean createdOK = false;
        int retryCount = 0;
        do {
            try {
                lambda.call().get();
                createdOK = true;
            }
            catch (Exception e) {
                if (partitionId != null) {
                    this.host.logWithHostAndPartition(Level.WARNING, partitionId, retryMessage, (Throwable)e);
                } else {
                    this.host.logWithHost(Level.WARNING, retryMessage, e);
                }
                ++retryCount;
            }
        } while (!createdOK && retryCount < maxRetries);
        if (!createdOK) {
            if (partitionId != null) {
                this.host.logWithHostAndPartition(Level.SEVERE, partitionId, finalFailureMessage);
            } else {
                this.host.logWithHost(Level.SEVERE, finalFailureMessage);
            }
            throw new ExceptionWithAction(new RuntimeException(finalFailureMessage), action);
        }
    }

    private void runLoop() throws Exception, ExceptionWithAction {
        while (this.keepGoing) {
            Object stealTheseLeases;
            ILeaseManager leaseManager = this.host.getLeaseManager();
            HashMap<String, Lease> allLeases = new HashMap<String, Lease>();
            Iterable<Future<Lease>> gettingAllLeases = leaseManager.getAllLeases();
            ArrayList<Lease> leasesOwnedByOthers = new ArrayList<Lease>();
            int ourLeasesCount = 0;
            for (Future<Lease> future : gettingAllLeases) {
                Lease possibleLease = null;
                try {
                    possibleLease = future.get();
                    if (possibleLease.isExpired()) {
                        if (leaseManager.acquireLease(possibleLease).get().booleanValue()) {
                            allLeases.put(possibleLease.getPartitionId(), possibleLease);
                            ++ourLeasesCount;
                            continue;
                        }
                        allLeases.put(possibleLease.getPartitionId(), possibleLease);
                        leasesOwnedByOthers.add(possibleLease);
                        continue;
                    }
                    if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0) {
                        if (leaseManager.renewLease(possibleLease).get().booleanValue()) {
                            allLeases.put(possibleLease.getPartitionId(), possibleLease);
                            ++ourLeasesCount;
                            continue;
                        }
                        allLeases.put(possibleLease.getPartitionId(), possibleLease);
                        leasesOwnedByOthers.add(possibleLease);
                        continue;
                    }
                    allLeases.put(possibleLease.getPartitionId(), possibleLease);
                    leasesOwnedByOthers.add(possibleLease);
                }
                catch (StorageException | ExecutionException e) {
                    this.host.logWithHost(Level.WARNING, "Failure getting/acquiring/renewing lease, skipping", e);
                    Throwable notifyWith = e;
                    if (e instanceof ExecutionException && e.getCause() != null && e.getCause() instanceof Exception) {
                        notifyWith = (Exception)e.getCause();
                    }
                    this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), (Exception)notifyWith, "Checking Leases", possibleLease != null ? possibleLease.getPartitionId() : "N/A");
                }
            }
            if (leasesOwnedByOthers.size() > 0 && (stealTheseLeases = this.whichLeasesToSteal(leasesOwnedByOthers, ourLeasesCount)) != null) {
                Iterator iterator = stealTheseLeases.iterator();
                while (iterator.hasNext()) {
                    Lease stealee = (Lease)iterator.next();
                    try {
                        if (leaseManager.acquireLease(stealee).get().booleanValue()) {
                            this.host.logWithHostAndPartition(Level.FINE, stealee.getPartitionId(), "Stole lease");
                            allLeases.put(stealee.getPartitionId(), stealee);
                            ++ourLeasesCount;
                            continue;
                        }
                        this.host.logWithHost(Level.WARNING, "Failed to steal lease for partition " + stealee.getPartitionId());
                    }
                    catch (ExecutionException e) {
                        this.host.logWithHost(Level.SEVERE, "Exception stealing lease for partition " + stealee.getPartitionId(), e);
                        this.host.getEventProcessorOptions().notifyOfException(this.host.getHostName(), e, "Stealing Lease", stealee.getPartitionId());
                    }
                }
            }
            stealTheseLeases = allLeases.keySet().iterator();
            while (stealTheseLeases.hasNext()) {
                String string = (String)stealTheseLeases.next();
                Lease updatedLease = (Lease)allLeases.get(string);
                this.host.logWithHost(Level.FINER, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner());
                if (updatedLease.getOwner().compareTo(this.host.getHostName()) == 0) {
                    this.pump.addPump(string, updatedLease);
                    continue;
                }
                Future<?> removing = this.pump.removePump(string, CloseReason.LeaseLost);
                if (removing == null) continue;
                removing.get();
            }
            this.onPartitionCheckCompleteTestHook();
            try {
                Thread.sleep(leaseManager.getLeaseRenewIntervalInMilliseconds());
            }
            catch (InterruptedException e) {
                this.host.logWithHost(Level.WARNING, "Sleep was interrupted", e);
                this.keepGoing = false;
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    private Iterable<Lease> whichLeasesToSteal(ArrayList<Lease> stealableLeases, int haveLeaseCount) {
        HashMap<String, Integer> countsByOwner = this.countLeasesByOwner(stealableLeases);
        String biggestOwner = this.findBiggestOwner(countsByOwner);
        int biggestCount = countsByOwner.get(biggestOwner);
        ArrayList<Lease> stealTheseLeases = null;
        if (biggestCount - haveLeaseCount >= 2) {
            stealTheseLeases = new ArrayList<Lease>();
            for (Lease l : stealableLeases) {
                if (l.getOwner().compareTo(biggestOwner) != 0) continue;
                stealTheseLeases.add(l);
                this.host.logWithHost(Level.FINER, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner);
                break;
            }
        }
        return stealTheseLeases;
    }

    private String findBiggestOwner(HashMap<String, Integer> countsByOwner) {
        int biggestCount = 0;
        String biggestOwner = null;
        for (String owner : countsByOwner.keySet()) {
            if (countsByOwner.get(owner) <= biggestCount) continue;
            biggestCount = countsByOwner.get(owner);
            biggestOwner = owner;
        }
        return biggestOwner;
    }

    private HashMap<String, Integer> countLeasesByOwner(Iterable<Lease> leases) {
        HashMap<String, Integer> counts = new HashMap<String, Integer>();
        for (Lease l : leases) {
            if (counts.containsKey(l.getOwner())) {
                Integer oldCount = (Integer)counts.get(l.getOwner());
                counts.put(l.getOwner(), oldCount + 1);
                continue;
            }
            counts.put(l.getOwner(), 1);
        }
        for (String owner : counts.keySet()) {
            this.host.log(Level.FINER, "host " + owner + " owns " + counts.get(owner) + " leases");
        }
        this.host.log(Level.FINER, "total hosts in sorted list: " + counts.size());
        return counts;
    }
}

