/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server.eventProcessor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.client.stream.impl.Controller;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.CheckpointConfig;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.eventProcessor.EventProcessorSystem;
import io.pravega.controller.eventProcessor.EventSerializer;
import io.pravega.controller.eventProcessor.ExceptionHandler;
import io.pravega.controller.eventProcessor.impl.ConcurrentEventProcessor;
import io.pravega.controller.eventProcessor.impl.EventProcessorGroupConfigImpl;
import io.pravega.controller.eventProcessor.impl.EventProcessorSystemImpl;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessorConfig;
import io.pravega.controller.server.eventProcessor.requesthandlers.AbortRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.AutoScaleTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.CommitRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.DeleteStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.ScaleOperationTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.SealStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.StreamRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.TruncateStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.UpdateStreamTask;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControllerEventProcessors
extends AbstractIdleService
implements FailoverSweeper {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ControllerEventProcessors.class);
    public static final EventSerializer<CommitEvent> COMMIT_EVENT_SERIALIZER = new EventSerializer();
    public static final EventSerializer<AbortEvent> ABORT_EVENT_SERIALIZER = new EventSerializer();
    public static final EventSerializer<ControllerEvent> CONTROLLER_EVENT_SERIALIZER = new EventSerializer();
    private static final long DELAY = 100L;
    private static final int MULTIPLIER = 10;
    private static final long MAX_DELAY = 10000L;
    private final String objectId;
    private final ControllerEventProcessorConfig config;
    private final CheckpointStore checkpointStore;
    private final EventProcessorSystem system;
    private final Controller controller;
    private final ClientFactoryImpl clientFactory;
    private final ScheduledExecutorService executor;
    private EventProcessorGroup<CommitEvent> commitEventProcessors;
    private EventProcessorGroup<AbortEvent> abortEventProcessors;
    private EventProcessorGroup<ControllerEvent> requestEventProcessors;
    private final StreamRequestHandler streamRequestHandler;
    private final CommitRequestHandler commitRequestHandler;
    private final AbortRequestHandler abortRequestHandler;

    public ControllerEventProcessors(String host, ControllerEventProcessorConfig config, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ConnectionFactory connectionFactory, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, ScheduledExecutorService executor) {
        this(host, config, controller, checkpointStore, streamMetadataStore, bucketStore, connectionFactory, streamMetadataTasks, streamTransactionMetadataTasks, null, executor);
    }

    @VisibleForTesting
    ControllerEventProcessors(String host, ControllerEventProcessorConfig config, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ConnectionFactory connectionFactory, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, EventProcessorSystem system, ScheduledExecutorService executor) {
        this.objectId = "ControllerEventProcessors";
        this.config = config;
        this.checkpointStore = checkpointStore;
        this.controller = controller;
        this.clientFactory = new ClientFactoryImpl(config.getScopeName(), controller, connectionFactory);
        this.system = system == null ? new EventProcessorSystemImpl("Controller", host, config.getScopeName(), (EventStreamClientFactory)this.clientFactory, (ReaderGroupManager)new ReaderGroupManagerImpl(config.getScopeName(), controller, (SynchronizerClientFactory)this.clientFactory, connectionFactory)) : system;
        this.streamRequestHandler = new StreamRequestHandler(new AutoScaleTask(streamMetadataTasks, streamMetadataStore, executor), new ScaleOperationTask(streamMetadataTasks, streamMetadataStore, executor), new UpdateStreamTask(streamMetadataTasks, streamMetadataStore, bucketStore, executor), new SealStreamTask(streamMetadataTasks, streamTransactionMetadataTasks, streamMetadataStore, executor), new DeleteStreamTask(streamMetadataTasks, streamMetadataStore, bucketStore, executor), new TruncateStreamTask(streamMetadataTasks, streamMetadataStore, executor), streamMetadataStore, executor);
        this.commitRequestHandler = new CommitRequestHandler(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, bucketStore, executor);
        this.abortRequestHandler = new AbortRequestHandler(streamMetadataStore, streamMetadataTasks, executor);
        this.executor = executor;
    }

    protected void startUp() throws Exception {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"startUp", (Object[])new Object[0]);
        try {
            log.info("Starting controller event processors");
            this.initialize();
            log.info("Controller event processors startUp complete");
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"startUp", (long)traceId, (Object[])new Object[0]);
        }
    }

    protected void shutDown() {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"shutDown", (Object[])new Object[0]);
        try {
            log.info("Stopping controller event processors");
            this.stopEventProcessors();
            log.info("Controller event processors shutDown complete");
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"shutDown", (long)traceId, (Object[])new Object[0]);
        }
    }

    @Override
    public boolean isReady() {
        return this.isRunning();
    }

    @Override
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> processes) {
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        if (this.commitEventProcessors != null) {
            futures.add(this.handleOrphanedReaders(this.commitEventProcessors, processes));
        }
        if (this.abortEventProcessors != null) {
            futures.add(this.handleOrphanedReaders(this.abortEventProcessors, processes));
        }
        if (this.requestEventProcessors != null) {
            futures.add(this.handleOrphanedReaders(this.requestEventProcessors, processes));
        }
        return Futures.allOf(futures);
    }

    @Override
    public CompletableFuture<Void> handleFailedProcess(String process) {
        ArrayList futures = new ArrayList();
        if (this.commitEventProcessors != null) {
            futures.add(RetryHelper.withRetriesAsync(() -> CompletableFuture.runAsync(() -> {
                try {
                    this.commitEventProcessors.notifyProcessFailure(process);
                }
                catch (CheckpointStoreException e) {
                    throw new CompletionException(e);
                }
            }, this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.abortEventProcessors != null) {
            futures.add(RetryHelper.withRetriesAsync(() -> CompletableFuture.runAsync(() -> {
                try {
                    this.abortEventProcessors.notifyProcessFailure(process);
                }
                catch (CheckpointStoreException e) {
                    throw new CompletionException(e);
                }
            }, this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.requestEventProcessors != null) {
            futures.add(RetryHelper.withRetriesAsync(() -> CompletableFuture.runAsync(() -> {
                try {
                    this.requestEventProcessors.notifyProcessFailure(process);
                }
                catch (CheckpointStoreException e) {
                    throw new CompletionException(e);
                }
            }, this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        return Futures.allOf(futures);
    }

    private CompletableFuture<Void> createStreams() {
        StreamConfiguration commitStreamConfig = StreamConfiguration.builder().scalingPolicy(this.config.getCommitStreamScalingPolicy()).build();
        StreamConfiguration abortStreamConfig = StreamConfiguration.builder().scalingPolicy(this.config.getAbortStreamScalingPolicy()).build();
        StreamConfiguration requestStreamConfig = StreamConfiguration.builder().scalingPolicy(this.config.getRequestStreamScalingPolicy()).build();
        String scope = this.config.getScopeName();
        CompletableFuture<Void> future = this.createScope(scope);
        return future.thenCompose(ignore -> CompletableFuture.allOf(this.createStream(scope, this.config.getCommitStreamName(), commitStreamConfig), this.createStream(scope, this.config.getAbortStreamName(), abortStreamConfig), this.createStream(scope, Config.SCALE_STREAM_NAME, requestStreamConfig)));
    }

    private CompletableFuture<Void> createScope(String scopeName) {
        return Futures.toVoid((CompletableFuture)Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, e -> log.warn("Error creating event processor scope {} with exception {}", (Object)scopeName, (Object)Exceptions.unwrap((Throwable)e).toString())).runAsync(() -> this.controller.createScope(scopeName).thenAccept(x -> log.info("Created controller scope {}", (Object)scopeName)), this.executor));
    }

    private CompletableFuture<Void> createStream(String scope, String streamName, StreamConfiguration streamConfig) {
        return Futures.toVoid((CompletableFuture)Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, e -> log.warn("Error creating event processor stream {} with exception {}", (Object)streamName, (Object)Exceptions.unwrap((Throwable)e).toString())).runAsync(() -> this.controller.createStream(scope, streamName, streamConfig).thenAccept(x -> log.info("Created stream {}/{}", (Object)scope, (Object)streamName)), this.executor));
    }

    public CompletableFuture<Void> bootstrap(StreamTransactionMetadataTasks streamTransactionMetadataTasks, StreamMetadataTasks streamMetadataTasks) {
        log.info("Bootstrapping controller event processors");
        return this.createStreams().thenAcceptAsync(x -> {
            streamMetadataTasks.initializeStreamWriters((EventStreamClientFactory)this.clientFactory, this.config.getRequestStreamName());
            streamTransactionMetadataTasks.initializeStreamWriters((EventStreamClientFactory)this.clientFactory, this.config);
        }, (Executor)this.executor);
    }

    private CompletableFuture<Void> handleOrphanedReaders(EventProcessorGroup<? extends ControllerEvent> group, Supplier<Set<String>> processes) {
        return ((CompletableFuture)RetryHelper.withRetriesAsync(() -> CompletableFuture.supplyAsync(() -> {
            try {
                return group.getProcesses();
            }
            catch (CheckpointStoreException e) {
                if (e.getType().equals((Object)CheckpointStoreException.Type.NoNode)) {
                    return Collections.emptySet();
                }
                throw new CompletionException(e);
            }
        }, this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(groupProcesses -> RetryHelper.withRetriesAsync(() -> this.lambda$null$17((Supplier)processes, groupProcesses, group), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor))).thenComposeAsync(pair -> {
            Set activeProcesses = (Set)pair.getLeft();
            Set registeredProcesses = (Set)pair.getRight();
            if (registeredProcesses == null || registeredProcesses.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            if (activeProcesses != null) {
                registeredProcesses.removeAll(activeProcesses);
            }
            ArrayList futureList = new ArrayList();
            for (String process : registeredProcesses) {
                futureList.add(RetryHelper.withRetriesAsync(() -> CompletableFuture.runAsync(() -> {
                    try {
                        group.notifyProcessFailure(process);
                    }
                    catch (CheckpointStoreException e) {
                        log.error(String.format("Error notifying failure of process=%s in event processor group %s", process, group.toString()), (Throwable)e);
                        throw new CompletionException(e);
                    }
                }, this.executor), RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
            }
            return Futures.allOf(futureList);
        });
    }

    private void initialize() throws Exception {
        EventProcessorGroupConfigImpl commitReadersConfig = EventProcessorGroupConfigImpl.builder().streamName(this.config.getCommitStreamName()).readerGroupName(this.config.getCommitReaderGroupName()).eventProcessorCount(this.config.getCommitReaderGroupSize()).checkpointConfig(CheckpointConfig.none()).build();
        EventProcessorConfig<CommitEvent> commitConfig = EventProcessorConfig.builder().config(commitReadersConfig).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(COMMIT_EVENT_SERIALIZER).supplier(() -> new ConcurrentEventProcessor(this.commitRequestHandler, this.executor)).build();
        log.info("Creating commit event processors");
        Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, e -> log.warn("Error creating commit event processor group", e)).run(() -> {
            this.commitEventProcessors = this.system.createEventProcessorGroup(commitConfig, this.checkpointStore);
            return null;
        });
        EventProcessorGroupConfigImpl abortReadersConfig = EventProcessorGroupConfigImpl.builder().streamName(this.config.getAbortStreamName()).readerGroupName(this.config.getAbortReaderGroupName()).eventProcessorCount(this.config.getAbortReaderGroupSize()).checkpointConfig(CheckpointConfig.none()).build();
        EventProcessorConfig<AbortEvent> abortConfig = EventProcessorConfig.builder().config(abortReadersConfig).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(ABORT_EVENT_SERIALIZER).supplier(() -> new ConcurrentEventProcessor(this.abortRequestHandler, this.executor)).build();
        log.info("Creating abort event processors");
        Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, e -> log.warn("Error creating commit event processor group", e)).run(() -> {
            this.abortEventProcessors = this.system.createEventProcessorGroup(abortConfig, this.checkpointStore);
            return null;
        });
        EventProcessorGroupConfigImpl requestReadersConfig = EventProcessorGroupConfigImpl.builder().streamName(this.config.getRequestStreamName()).readerGroupName(this.config.getRequestReaderGroupName()).eventProcessorCount(1).checkpointConfig(CheckpointConfig.none()).build();
        EventProcessorConfig<ControllerEvent> requestConfig = EventProcessorConfig.builder().config(requestReadersConfig).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(CONTROLLER_EVENT_SERIALIZER).supplier(() -> new ConcurrentEventProcessor(this.streamRequestHandler, this.executor)).build();
        log.info("Creating request event processors");
        Retry.indefinitelyWithExpBackoff((long)100L, (int)10, (long)10000L, e -> log.warn("Error creating request event processor group", e)).run(() -> {
            this.requestEventProcessors = this.system.createEventProcessorGroup(requestConfig, this.checkpointStore);
            return null;
        });
        log.info("Awaiting start of commit event processors");
        this.commitEventProcessors.awaitRunning();
        log.info("Awaiting start of abort event processors");
        this.abortEventProcessors.awaitRunning();
        log.info("Awaiting start of request event processors");
        this.requestEventProcessors.awaitRunning();
    }

    private void stopEventProcessors() {
        if (this.commitEventProcessors != null) {
            log.info("Stopping commit event processors");
            this.commitEventProcessors.stopAsync();
        }
        if (this.abortEventProcessors != null) {
            log.info("Stopping abort event processors");
            this.abortEventProcessors.stopAsync();
        }
        if (this.requestEventProcessors != null) {
            log.info("Stopping request event processors");
            this.requestEventProcessors.stopAsync();
        }
        if (this.commitEventProcessors != null) {
            log.info("Awaiting termination of commit event processors");
            this.commitEventProcessors.awaitTerminated();
        }
        if (this.abortEventProcessors != null) {
            log.info("Awaiting termination of abort event processors");
            this.abortEventProcessors.awaitTerminated();
        }
        if (this.requestEventProcessors != null) {
            log.info("Awaiting termination of request event processors");
            this.requestEventProcessors.awaitTerminated();
        }
    }

    private /* synthetic */ CompletableFuture lambda$null$17(Supplier processes, Set groupProcesses, EventProcessorGroup group) {
        return CompletableFuture.supplyAsync(() -> ControllerEventProcessors.lambda$null$16((Supplier)processes, groupProcesses, group), this.executor);
    }

    private static /* synthetic */ ImmutablePair lambda$null$16(Supplier processes, Set groupProcesses, EventProcessorGroup group) {
        try {
            return new ImmutablePair(processes.get(), (Object)groupProcesses);
        }
        catch (Exception e) {
            log.error(String.format("Error fetching current processes%s", group.toString()), (Throwable)e);
            throw new CompletionException(e);
        }
    }
}

