/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.controller.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.cluster.Cluster;
import io.pravega.common.cluster.Host;
import io.pravega.common.cluster.zkImpl.ClusterZKImpl;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.util.BooleanUtils;
import io.pravega.controller.fault.ControllerClusterListener;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.fault.SegmentContainerMonitor;
import io.pravega.controller.fault.UniformContainerBalancer;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.ControllerServiceConfig;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.ZkSessionExpirationException;
import io.pravega.controller.server.bucket.BucketManager;
import io.pravega.controller.server.bucket.BucketServiceFactory;
import io.pravega.controller.server.bucket.PeriodicRetention;
import io.pravega.controller.server.bucket.PeriodicWatermarking;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.eventProcessor.LocalController;
import io.pravega.controller.server.rest.RESTServer;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.server.rpc.grpc.GRPCServer;
import io.pravega.controller.server.rpc.grpc.GRPCServerConfig;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreFactory;
import io.pravega.controller.store.client.StoreClient;
import io.pravega.controller.store.client.StoreType;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.host.HostStoreFactory;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.StreamStoreFactory;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.store.task.TaskStoreFactory;
import io.pravega.controller.task.Stream.RequestSweeper;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.task.Stream.TxnSweeper;
import io.pravega.controller.task.TaskSweeper;
import io.pravega.controller.util.Config;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControllerServiceStarter
extends AbstractIdleService {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(ControllerServiceStarter.class);
    private final ControllerServiceConfig serviceConfig;
    private final StoreClient storeClient;
    private final String objectId;
    private ScheduledExecutorService controllerExecutor;
    private ScheduledExecutorService retentionExecutor;
    private ScheduledExecutorService watermarkingExecutor;
    private ConnectionFactory connectionFactory;
    private StreamMetadataStore streamStore;
    private StreamMetadataTasks streamMetadataTasks;
    private StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private BucketManager retentionService;
    private BucketManager watermarkingService;
    private SegmentContainerMonitor monitor;
    private ControllerClusterListener controllerClusterListener;
    private SegmentHelper segmentHelper;
    private ControllerService controllerService;
    private LocalController localController;
    private ControllerEventProcessors controllerEventProcessors;
    private final CountDownLatch controllerReadyLatch;
    private GRPCServer grpcServer;
    private RESTServer restServer;
    private Cluster cluster = null;
    private StreamMetrics streamMetrics;
    private TransactionMetrics transactionMetrics;
    private final Optional<SegmentHelper> segmentHelperRef;
    private final Optional<ConnectionFactory> connectionFactoryRef;
    private final Optional<StreamMetadataStore> streamMetadataStoreRef;
    @VisibleForTesting
    private final CompletableFuture<Void> storeClientFailureFuture;

    public ControllerServiceStarter(ControllerServiceConfig serviceConfig, StoreClient storeClient) {
        this(serviceConfig, storeClient, null);
    }

    @VisibleForTesting
    ControllerServiceStarter(ControllerServiceConfig serviceConfig, StoreClient storeClient, SegmentHelper segmentHelper) {
        this(serviceConfig, storeClient, segmentHelper, null, null);
    }

    @VisibleForTesting
    ControllerServiceStarter(ControllerServiceConfig serviceConfig, StoreClient storeClient, SegmentHelper segmentHelper, ConnectionFactory connectionFactory, StreamMetadataStore streamStore) {
        this.serviceConfig = serviceConfig;
        this.storeClient = storeClient;
        this.objectId = "ControllerServiceStarter";
        this.controllerReadyLatch = new CountDownLatch(1);
        this.segmentHelperRef = Optional.ofNullable(segmentHelper);
        this.connectionFactoryRef = Optional.ofNullable(connectionFactory);
        this.streamMetadataStoreRef = Optional.ofNullable(streamStore);
        this.storeClientFailureFuture = new CompletableFuture();
    }

    protected void startUp() {
        long traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"startUp", (Object[])new Object[0]);
        log.info("Initiating controller service startUp");
        log.info("Controller serviceConfig = {}", (Object)this.serviceConfig.toString());
        log.info("Event processors enabled = {}", (Object)this.serviceConfig.getEventProcessorConfig().isPresent());
        log.info("Cluster listener enabled = {}", (Object)this.serviceConfig.isControllerClusterListenerEnabled());
        log.info("    Host monitor enabled = {}", (Object)this.serviceConfig.getHostMonitorConfig().isHostMonitorEnabled());
        log.info("     gRPC server enabled = {}", (Object)this.serviceConfig.getGRPCServerConfig().isPresent());
        log.info("     REST server enabled = {}", (Object)this.serviceConfig.getRestServerConfig().isPresent());
        try {
            this.controllerExecutor = ExecutorServiceHelpers.newScheduledThreadPool((int)this.serviceConfig.getThreadPoolSize(), (String)"controllerpool");
            this.retentionExecutor = ExecutorServiceHelpers.newScheduledThreadPool((int)Config.RETENTION_THREAD_POOL_SIZE, (String)"retentionpool");
            this.watermarkingExecutor = ExecutorServiceHelpers.newScheduledThreadPool((int)Config.WATERMARKING_THREAD_POOL_SIZE, (String)"watermarkingpool");
            log.info("Creating the bucket store");
            BucketStore bucketStore = StreamStoreFactory.createBucketStore(this.storeClient, this.controllerExecutor);
            log.info("Creating the task store");
            TaskMetadataStore taskMetadataStore = TaskStoreFactory.createStore(this.storeClient, this.controllerExecutor);
            log.info("Creating the host store");
            HostControllerStore hostStore = HostStoreFactory.createStore(this.serviceConfig.getHostMonitorConfig(), this.storeClient);
            log.info("Creating the checkpoint store");
            CheckpointStore checkpointStore = CheckpointStoreFactory.create(this.storeClient);
            String hostName = this.getHostName();
            Host host = new Host(hostName, this.getPort(), UUID.randomUUID().toString());
            GRPCServerConfig grpcServerConfig = this.serviceConfig.getGRPCServerConfig().get();
            RequestTracker requestTracker = new RequestTracker(grpcServerConfig.isRequestTracingEnabled());
            if (this.serviceConfig.getHostMonitorConfig().isHostMonitorEnabled()) {
                this.monitor = new SegmentContainerMonitor(hostStore, (CuratorFramework)this.storeClient.getClient(), new UniformContainerBalancer(), this.serviceConfig.getHostMonitorConfig().getHostMonitorMinRebalanceInterval());
                log.info("Starting segment container monitor");
                this.monitor.startAsync();
            }
            ClientConfig.ClientConfigBuilder clientConfigBuilder = ClientConfig.builder().controllerURI(URI.create((grpcServerConfig.isTlsEnabled() ? "tls://" : "tcp://") + "localhost:" + grpcServerConfig.getPort())).trustStore(grpcServerConfig.getTlsTrustStore()).validateHostName(false);
            Optional tlsEnabledForSegmentStore = BooleanUtils.extract((String)this.serviceConfig.getTlsEnabledForSegmentStore());
            if (tlsEnabledForSegmentStore.isPresent()) {
                clientConfigBuilder.enableTlsToSegmentStore(((Boolean)tlsEnabledForSegmentStore.get()).booleanValue());
            }
            ClientConfig clientConfig = clientConfigBuilder.build();
            this.connectionFactory = this.connectionFactoryRef.orElse((ConnectionFactory)new ConnectionFactoryImpl(clientConfig));
            this.segmentHelper = this.segmentHelperRef.orElse(new SegmentHelper(this.connectionFactory, hostStore));
            GrpcAuthHelper authHelper = new GrpcAuthHelper(this.serviceConfig.getGRPCServerConfig().get().isAuthorizationEnabled(), grpcServerConfig.getTokenSigningKey(), grpcServerConfig.getAccessTokenTTLInSeconds());
            log.info("Creating the stream store");
            this.streamStore = this.streamMetadataStoreRef.orElse(StreamStoreFactory.createStore(this.storeClient, this.segmentHelper, authHelper, this.controllerExecutor));
            this.streamMetadataTasks = new StreamMetadataTasks(this.streamStore, bucketStore, taskMetadataStore, this.segmentHelper, this.controllerExecutor, host.getHostId(), authHelper, requestTracker);
            this.streamTransactionMetadataTasks = new StreamTransactionMetadataTasks(this.streamStore, this.segmentHelper, this.controllerExecutor, host.getHostId(), this.serviceConfig.getTimeoutServiceConfig(), authHelper);
            BucketServiceFactory bucketServiceFactory = new BucketServiceFactory(host.getHostId(), bucketStore, 1000);
            Duration executionDurationRetention = Duration.ofMinutes(Config.MINIMUM_RETENTION_FREQUENCY_IN_MINUTES);
            PeriodicRetention retentionWork = new PeriodicRetention(this.streamStore, this.streamMetadataTasks, this.retentionExecutor, requestTracker);
            this.retentionService = bucketServiceFactory.createRetentionService(executionDurationRetention, retentionWork::retention, this.retentionExecutor);
            log.info("starting background periodic service for retention");
            this.retentionService.startAsync();
            this.retentionService.awaitRunning();
            Duration executionDurationWatermarking = Duration.ofSeconds(Config.MINIMUM_WATERMARKING_FREQUENCY_IN_SECONDS);
            PeriodicWatermarking watermarkingWork = new PeriodicWatermarking(this.streamStore, bucketStore, clientConfig, this.watermarkingExecutor);
            this.watermarkingService = bucketServiceFactory.createWatermarkingService(executionDurationWatermarking, watermarkingWork::watermark, this.watermarkingExecutor);
            log.info("starting background periodic service for watermarking");
            this.watermarkingService.startAsync();
            this.watermarkingService.awaitRunning();
            TaskSweeper taskSweeper = new TaskSweeper(taskMetadataStore, host.getHostId(), this.controllerExecutor, this.streamMetadataTasks);
            TxnSweeper txnSweeper = new TxnSweeper(this.streamStore, this.streamTransactionMetadataTasks, this.serviceConfig.getTimeoutServiceConfig().getMaxLeaseValue(), this.controllerExecutor);
            RequestSweeper requestSweeper = new RequestSweeper(this.streamStore, this.controllerExecutor, this.streamMetadataTasks);
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                this.cluster = new ClusterZKImpl((CuratorFramework)this.storeClient.getClient(), "controllers");
            }
            this.streamMetrics = new StreamMetrics();
            this.transactionMetrics = new TransactionMetrics();
            this.controllerService = new ControllerService(this.streamStore, bucketStore, this.streamMetadataTasks, this.streamTransactionMetadataTasks, this.segmentHelper, this.controllerExecutor, this.cluster, this.streamMetrics, this.transactionMetrics);
            this.setController(new LocalController(this.controllerService, grpcServerConfig.isAuthorizationEnabled(), grpcServerConfig.getTokenSigningKey()));
            CompletionStage<Object> eventProcessorFuture = CompletableFuture.completedFuture(null);
            if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                this.controllerEventProcessors = new ControllerEventProcessors(host.getHostId(), this.serviceConfig.getEventProcessorConfig().get(), this.localController, checkpointStore, this.streamStore, bucketStore, this.connectionFactory, this.streamMetadataTasks, this.streamTransactionMetadataTasks, this.controllerExecutor);
                log.info("Starting event processors");
                eventProcessorFuture = this.controllerEventProcessors.bootstrap(this.streamTransactionMetadataTasks, this.streamMetadataTasks).thenAcceptAsync(x -> this.controllerEventProcessors.startAsync(), (Executor)this.controllerExecutor);
            }
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                ArrayList<FailoverSweeper> failoverSweepers = new ArrayList<FailoverSweeper>();
                failoverSweepers.add(taskSweeper);
                failoverSweepers.add(txnSweeper);
                failoverSweepers.add(requestSweeper);
                if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                    assert (this.controllerEventProcessors != null);
                    failoverSweepers.add(this.controllerEventProcessors);
                }
                this.controllerClusterListener = new ControllerClusterListener(host, this.cluster, this.controllerExecutor, failoverSweepers);
                log.info("Starting controller cluster listener");
                this.controllerClusterListener.startAsync();
            }
            if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
                this.grpcServer = new GRPCServer(this.controllerService, grpcServerConfig, requestTracker);
                this.grpcServer.startAsync();
                log.info("Awaiting start of rpc server");
                this.grpcServer.awaitRunning();
            }
            if (this.serviceConfig.getRestServerConfig().isPresent()) {
                this.restServer = new RESTServer(this.localController, this.controllerService, this.grpcServer.getAuthHandlerManager(), this.serviceConfig.getRestServerConfig().get(), this.connectionFactory);
                this.restServer.startAsync();
                log.info("Awaiting start of REST server");
                this.restServer.awaitRunning();
            }
            if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                log.info("Awaiting start of controller event processors");
                CompletableFuture.anyOf(new CompletableFuture[]{this.storeClientFailureFuture, eventProcessorFuture.thenAccept(x -> this.controllerEventProcessors.awaitRunning())}).join();
            }
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                log.info("Awaiting start of controller cluster listener");
                this.controllerClusterListener.awaitRunning();
            }
        }
        catch (Exception e) {
            log.error("Failed trying to start controller services", (Throwable)e);
            throw e;
        }
        finally {
            LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"startUp", (long)traceId, (Object[])new Object[0]);
        }
    }

    protected void shutDown() throws Exception {
        long traceId;
        block25: {
            traceId = LoggerHelpers.traceEnterWithContext((Logger)log, (String)this.objectId, (String)"shutDown", (Object[])new Object[0]);
            log.info("Initiating controller service shutDown");
            try {
                if (this.restServer != null) {
                    this.restServer.stopAsync();
                }
                if (this.grpcServer != null) {
                    this.grpcServer.stopAsync();
                }
                if (this.controllerEventProcessors != null) {
                    log.info("Stopping controller event processors");
                    this.controllerEventProcessors.stopAsync();
                }
                if (this.monitor != null) {
                    log.info("Stopping the segment container monitor");
                    this.monitor.stopAsync();
                }
                if (this.controllerClusterListener != null) {
                    log.info("Stopping controller cluster listener");
                    this.controllerClusterListener.stopAsync();
                    log.info("Controller cluster listener shutdown");
                }
                if (this.retentionService != null) {
                    log.info("Stopping auto retention service");
                    this.retentionService.stopAsync();
                }
                if (this.watermarkingService != null) {
                    log.info("Stopping watermarking service");
                    this.watermarkingService.stopAsync();
                }
                log.info("Closing stream metadata tasks");
                this.streamMetadataTasks.close();
                log.info("Closing stream transaction metadata tasks");
                this.streamTransactionMetadataTasks.close();
                if (this.restServer != null) {
                    log.info("Awaiting termination of REST server");
                    this.restServer.awaitTerminated();
                }
                if (this.grpcServer != null) {
                    log.info("Awaiting termination of gRPC server");
                    this.grpcServer.awaitTerminated();
                }
                if (this.controllerEventProcessors != null) {
                    log.info("Awaiting termination of controller event processors");
                    this.controllerEventProcessors.awaitTerminated();
                }
                if (this.monitor != null) {
                    log.info("Awaiting termination of segment container monitor");
                    this.monitor.awaitTerminated();
                }
                if (this.controllerClusterListener != null) {
                    log.info("Awaiting termination of controller cluster listener");
                    this.controllerClusterListener.awaitTerminated();
                }
                if (this.retentionService != null) {
                    log.info("Awaiting termination of auto retention");
                    this.retentionService.awaitTerminated();
                }
                if (this.watermarkingService == null) break block25;
                log.info("Awaiting termination of watermarking service");
                this.watermarkingService.awaitTerminated();
            }
            catch (Exception e) {
                try {
                    log.error("Controller Service Starter threw exception during shutdown", (Throwable)e);
                    throw e;
                }
                catch (Throwable throwable) {
                    log.info("Stopping controller executor");
                    ExecutorServiceHelpers.shutdown((Duration)Duration.ofSeconds(5L), (ExecutorService[])new ExecutorService[]{this.controllerExecutor, this.retentionExecutor, this.watermarkingExecutor});
                    if (this.cluster != null) {
                        log.info("Closing controller cluster instance");
                        this.cluster.close();
                    }
                    if (this.segmentHelper != null) {
                        log.info("closing segment helper");
                        this.segmentHelper.close();
                    }
                    log.info("Closing connection factory");
                    this.connectionFactory.close();
                    log.info("Closing storeClient");
                    this.storeClient.close();
                    log.info("Closing store");
                    this.streamStore.close();
                    if (this.streamMetrics != null) {
                        this.streamMetrics.close();
                    }
                    if (this.transactionMetrics != null) {
                        this.transactionMetrics.close();
                    }
                    log.info("Finishing controller service shutDown");
                    LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"shutDown", (long)traceId, (Object[])new Object[0]);
                    throw throwable;
                }
            }
        }
        log.info("Stopping controller executor");
        ExecutorServiceHelpers.shutdown((Duration)Duration.ofSeconds(5L), (ExecutorService[])new ExecutorService[]{this.controllerExecutor, this.retentionExecutor, this.watermarkingExecutor});
        if (this.cluster != null) {
            log.info("Closing controller cluster instance");
            this.cluster.close();
        }
        if (this.segmentHelper != null) {
            log.info("closing segment helper");
            this.segmentHelper.close();
        }
        log.info("Closing connection factory");
        this.connectionFactory.close();
        log.info("Closing storeClient");
        this.storeClient.close();
        log.info("Closing store");
        this.streamStore.close();
        if (this.streamMetrics != null) {
            this.streamMetrics.close();
        }
        if (this.transactionMetrics != null) {
            this.transactionMetrics.close();
        }
        log.info("Finishing controller service shutDown");
        LoggerHelpers.traceLeave((Logger)log, (String)this.objectId, (String)"shutDown", (long)traceId, (Object[])new Object[0]);
    }

    void notifySessionExpiration() {
        assert (this.storeClient.getType().equals((Object)StoreType.Zookeeper) || this.storeClient.getType().equals((Object)StoreType.PravegaTable));
        this.storeClientFailureFuture.completeExceptionally(new ZkSessionExpirationException("Zookeeper Session Expired"));
    }

    @VisibleForTesting
    public boolean awaitTasksModuleInitialization(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.streamTransactionMetadataTasks.awaitInitialization(timeout, timeUnit);
    }

    @VisibleForTesting
    public ControllerService getControllerService() throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.controllerService;
    }

    @VisibleForTesting
    public LocalController getController() throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.localController;
    }

    private void setController(LocalController controller) {
        this.localController = controller;
        this.controllerReadyLatch.countDown();
    }

    private String getHostName() {
        String hostName = null;
        if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
            hostName = this.serviceConfig.getGRPCServerConfig().get().getPublishedRPCHost().orElse(null);
        }
        if (StringUtils.isEmpty(hostName)) {
            try {
                hostName = InetAddress.getLocalHost().getHostAddress();
            }
            catch (UnknownHostException e) {
                log.warn("Failed to get host address, defaulting to localhost: {}", (Throwable)e);
                hostName = "localhost";
            }
        }
        return hostName;
    }

    private int getPort() {
        int port = 0;
        if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
            port = this.serviceConfig.getGRPCServerConfig().get().getPublishedRPCPort().orElse(this.serviceConfig.getGRPCServerConfig().get().getPort());
        }
        return port;
    }

    @SuppressFBWarnings(justification="generated code")
    CompletableFuture<Void> getStoreClientFailureFuture() {
        return this.storeClientFailureFuture;
    }
}

