/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventHubAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClientOptions;
import com.azure.messaging.eventhubs.PartitionBasedLoadBalancer;
import com.azure.messaging.eventhubs.PartitionPumpManager;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentedCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServiceClient(builder=EventProcessorClientBuilder.class)
public class EventProcessorClient {
    private static final long BASE_JITTER_IN_SECONDS = 2L;
    private static final Duration DEFAULT_STOP_TIMEOUT = Duration.ofSeconds(10L);
    private final ClientLogger logger;
    private final String identifier;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final PartitionPumpManager partitionPumpManager;
    private final PartitionBasedLoadBalancer partitionBasedLoadBalancer;
    private final CheckpointStore checkpointStore;
    private final AtomicReference<ScheduledFuture<?>> runner = new AtomicReference();
    private final AtomicReference<ScheduledExecutorService> scheduler = new AtomicReference();
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final Duration loadBalancerUpdateInterval;
    private final EventProcessorClientOptions processorClientOptions;

    EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore, Consumer<ErrorContext> processError, Tracer tracer, Meter meter, EventProcessorClientOptions processorClientOptions) {
        Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
        this.processorClientOptions = Objects.requireNonNull(processorClientOptions, "processorClientOptions cannot be null.");
        Objects.requireNonNull(processorClientOptions.getConsumerGroup(), "'consumerGroup' cannot be null.");
        Objects.requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null.");
        EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
        this.identifier = eventHubAsyncClient.getIdentifier();
        HashMap<String, String> loggingContext = new HashMap<String, String>();
        loggingContext.put("eventProcessorId", this.identifier);
        this.logger = new ClientLogger(EventProcessorClient.class, loggingContext);
        this.fullyQualifiedNamespace = eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT);
        this.eventHubName = eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT);
        this.consumerGroup = processorClientOptions.getConsumerGroup().toLowerCase(Locale.ROOT);
        this.loadBalancerUpdateInterval = processorClientOptions.getLoadBalancerUpdateInterval();
        EventHubsConsumerInstrumentation instrumentation = new EventHubsConsumerInstrumentation(tracer, meter, this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, true);
        Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
        this.checkpointStore = InstrumentedCheckpointStore.create(checkpointStore, instrumentation);
        this.partitionPumpManager = new PartitionPumpManager(this.checkpointStore, partitionProcessorFactory, eventHubClientBuilder, instrumentation, processorClientOptions);
        this.partitionBasedLoadBalancer = new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient, this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.identifier, processorClientOptions.getPartitionOwnershipExpirationInterval().getSeconds(), this.partitionPumpManager, processError, processorClientOptions.getLoadBalancingStrategy());
    }

    EventProcessorClientOptions getEventProcessorClientOptions() {
        return this.processorClientOptions;
    }

    String getConsumerGroup() {
        return this.consumerGroup;
    }

    String getEventHubName() {
        return this.eventHubName;
    }

    String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public synchronized void start() {
        if (!this.isRunning.compareAndSet(false, true)) {
            this.logger.info("Event processor is already running");
            return;
        }
        this.logger.info("Starting a new event processor instance.");
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        this.scheduler.set(executor);
        Double jitterInMillis = ThreadLocalRandom.current().nextDouble() * (double)TimeUnit.SECONDS.toMillis(2L);
        this.runner.set(this.scheduler.get().scheduleWithFixedDelay(this.partitionBasedLoadBalancer::loadBalance, jitterInMillis.longValue(), this.loadBalancerUpdateInterval.toMillis(), TimeUnit.MILLISECONDS));
    }

    public synchronized void stop() {
        try {
            this.stop(DEFAULT_STOP_TIMEOUT);
        }
        catch (RuntimeException e) {
            this.logger.info("Error while stopping the event processor", new Object[]{e});
        }
    }

    public synchronized void stop(Duration timeout) {
        if (!this.isRunning.compareAndSet(true, false)) {
            this.logger.info("Event processor has already stopped");
            return;
        }
        this.runner.get().cancel(true);
        Mono awaitScheduler = Mono.fromRunnable(() -> this.shutdownWithAwait(this.scheduler.get(), timeout.toMillis()));
        Flux clearOwnership = this.checkpointStore.listOwnership(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup).filter(ownership -> this.identifier.equals(ownership.getOwnerId())).map(ownership -> ownership.setOwnerId("")).collect(Collectors.toList()).flatMapMany(p -> this.checkpointStore.claimOwnership((List<PartitionOwnership>)p).onErrorResume(ex -> Mono.empty()));
        Mono.when((Publisher[])new Publisher[]{awaitScheduler, this.partitionPumpManager.stopAllPartitionPumps().onErrorResume(ex -> Mono.empty()), clearOwnership.onErrorResume(ex -> Mono.empty())}).block(timeout);
    }

    public synchronized boolean isRunning() {
        return this.isRunning.get();
    }

    private void shutdownWithAwait(ExecutorService service, long timeoutMillis) {
        service.shutdown();
        try {
            if (!service.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)) {
                service.shutdownNow();
            }
        }
        catch (InterruptedException ex) {
            service.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

