/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.stream.server;

import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.shade.com.beust.jcommander.JCommander;
import org.apache.pulsar.shade.com.beust.jcommander.Parameter;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.component.ComponentInfoPublisher;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.shade.org.apache.bookkeeper.server.http.BKHttpServiceProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.server.service.HttpService;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.ExitCode;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.conf.DLConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.BookieService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.BookieWatchService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.ClusterControllerService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.CuratorProviderService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.GrpcService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.RegistrationStateService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.StatsProviderService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.service.StorageService;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
import org.apache.pulsar.shade.org.apache.commons.configuration.CompositeConfiguration;
import org.apache.pulsar.shade.org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.shade.org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageServer {
    private static final Logger log = LoggerFactory.getLogger(StorageServer.class);

    private static void loadConfFile(CompositeConfiguration conf, String confFile) throws IllegalArgumentException {
        try {
            PropertiesConfiguration loadedConf = new PropertiesConfiguration(new File(confFile).toURI().toURL());
            conf.addConfiguration(loadedConf);
        }
        catch (MalformedURLException e) {
            log.error("Could not open configuration file {}", (Object)confFile, (Object)e);
            throw new IllegalArgumentException("Could not open configuration file " + confFile, e);
        }
        catch (ConfigurationException e) {
            log.error("Malformed configuration file {}", (Object)confFile, (Object)e);
            throw new IllegalArgumentException("Malformed configuration file " + confFile, e);
        }
        log.info("Loaded configuration file {}", (Object)confFile);
    }

    public static Endpoint createLocalEndpoint(int port, boolean useHostname) throws UnknownHostException {
        log.warn("Determining hostname for stream storage");
        String hostname = useHostname ? InetAddress.getLocalHost().getCanonicalHostName() : InetAddress.getLocalHost().getHostAddress();
        log.warn("Decided to use hostname {}", (Object)hostname);
        return Endpoint.newBuilder().setHostname(hostname).setPort(port).build();
    }

    public static void main(String[] args) {
        int retCode = StorageServer.doMain(args);
        Runtime.getRuntime().exit(retCode);
    }

    static int doMain(String[] args) {
        LifecycleComponent storageServer;
        Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> log.error("Uncaught exception in thread {}: {}", (Object)thread.getName(), (Object)exception.getMessage()));
        ServerArguments arguments = new ServerArguments();
        JCommander jCommander = new JCommander(arguments);
        jCommander.setProgramName("StorageServer");
        jCommander.parse(args);
        if (arguments.help) {
            jCommander.usage();
            return ExitCode.INVALID_CONF.code();
        }
        CompositeConfiguration conf = new CompositeConfiguration();
        if (null != arguments.serverConfigFile) {
            StorageServer.loadConfFile(conf, arguments.serverConfigFile);
        }
        int grpcPort = arguments.port;
        boolean grpcUseHostname = arguments.useHostname;
        try {
            storageServer = StorageServer.buildStorageServer(conf, grpcPort, grpcUseHostname);
        }
        catch (Exception e) {
            log.error("Invalid storage configuration", (Throwable)e);
            return ExitCode.INVALID_CONF.code();
        }
        CompletableFuture<Void> liveFuture = ComponentStarter.startComponent(storageServer);
        try {
            liveFuture.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.info("Storage server is interrupted. Exiting ...");
        }
        catch (ExecutionException e) {
            log.info("Storage server is exiting ...");
        }
        return ExitCode.OK.code();
    }

    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, int grpcPort) throws Exception {
        return StorageServer.buildStorageServer(conf, grpcPort, false, true, NullStatsLogger.INSTANCE);
    }

    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, int grpcPort, boolean useHostname) throws Exception {
        return StorageServer.buildStorageServer(conf, grpcPort, false, useHostname, NullStatsLogger.INSTANCE);
    }

    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, int grpcPort, boolean useHostname, boolean startBookieAndStartProvider, StatsLogger externalStatsLogger) throws Exception {
        ServerConfiguration bkServerConf;
        StatsLogger rootStatsLogger;
        ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();
        Supplier<BookieServiceInfo> bookieServiceInfoProvider = () -> StorageServer.buildBookieServiceInfo(componentInfoPublisher);
        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder().withName("storage-server").withComponentInfoPublisher(componentInfoPublisher);
        BookieConfiguration bkConf = BookieConfiguration.of(conf);
        bkConf.validate();
        DLConfiguration dlConf = DLConfiguration.of(conf);
        dlConf.validate();
        StorageServerConfiguration serverConf = StorageServerConfiguration.of(conf);
        serverConf.validate();
        StorageConfiguration storageConf = new StorageConfiguration(conf);
        storageConf.validate();
        Endpoint myEndpoint = StorageServer.createLocalEndpoint(grpcPort, useHostname);
        StorageResources storageResources = StorageResources.create();
        StatsProviderService statsProviderService = null;
        if (startBookieAndStartProvider) {
            statsProviderService = new StatsProviderService(bkConf);
            rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
            serverBuilder.addComponent(statsProviderService);
            log.info("Bookie configuration : {}", (Object)bkConf.asJson());
        } else {
            rootStatsLogger = Preconditions.checkNotNull(externalStatsLogger, "External stats logger is not provided while not starting stats provider");
        }
        log.info("Dlog configuration : {}", (Object)dlConf.asJson());
        log.info("Storage configuration : {}", (Object)storageConf.asJson());
        log.info("Server configuration : {}", (Object)serverConf.asJson());
        if (startBookieAndStartProvider) {
            BookieService bookieService = new BookieService(bkConf, rootStatsLogger, bookieServiceInfoProvider);
            serverBuilder.addComponent(bookieService);
            bkServerConf = bookieService.serverConf();
            if (bkServerConf.isHttpServerEnabled()) {
                BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder().setBookieServer(bookieService.getServer()).setServerConfiguration(bkServerConf).setStatsProvider(statsProviderService.getStatsProvider()).build();
                HttpService httpService = new HttpService(provider, new org.apache.pulsar.shade.org.apache.bookkeeper.server.conf.BookieConfiguration(bkServerConf), rootStatsLogger);
                serverBuilder.addComponent(httpService);
                log.info("Load lifecycle component : {}", (Object)HttpService.class.getName());
            }
        } else {
            bkServerConf = new ServerConfiguration();
            bkServerConf.loadConf(bkConf.getUnderlyingConf());
        }
        DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
        dlogConf.loadConf(dlConf);
        BookieWatchService bkWatchService = new BookieWatchService(dlogConf.getEnsembleSize(), bkConf, (StatsLogger)NullStatsLogger.INSTANCE);
        CuratorProviderService curatorProviderService = new CuratorProviderService(bkServerConf, dlConf, rootStatsLogger.scope("curator"));
        DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(bkServerConf, dlConf, rootStatsLogger.scope("dlog"));
        StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder().serviceUri("bk://localhost:" + grpcPort).build();
        StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder().withStatsLogger(rootStatsLogger.scope("storage")).withStorageConfiguration(storageConf).withStorageResources(storageResources).withStorageContainerPlacementPolicyFactory(() -> {
            long numStorageContainers;
            try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(curatorProviderService.get(), ZKMetadataDriverBase.resolveZkServers(bkServerConf), "/stream");){
                numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
            }
            return StorageContainerPlacementPolicyImpl.of((int)numStorageContainers);
        }).withDefaultBackendUri(dlNamespaceProvider.getDlogUri()).withStorageContainerManagerFactory((storeConf, registry) -> new ZkStorageContainerManager(myEndpoint, storageConf, new ZkClusterMetadataStore(curatorProviderService.get(), ZKMetadataDriverBase.resolveZkServers(bkServerConf), "/stream"), registry, rootStatsLogger.scope("sc").scope("manager"))).withRangeStoreFactory(new MVCCStoreFactoryImpl(dlNamespaceProvider, () -> new DLCheckpointStore(dlNamespaceProvider.get()), storageConf.getRangeStoreDirs(), storageResources, storageConf.getServeReadOnlyTables(), storageConf)).withStorageServerClientManager(() -> new StorageServerClientManagerImpl(proxyClientSettings, storageResources.scheduler(), StorageServerChannel.factory(proxyClientSettings).andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor()))));
        StorageService storageService = new StorageService(storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
        StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
        GrpcServerSpec serverSpec = GrpcServerSpec.builder().storeSupplier(storageService).storeServerConf(serverConf).endpoint(myEndpoint).statsLogger(rpcStatsLogger).build();
        GrpcService grpcService = new GrpcService(serverConf, serverSpec, rpcStatsLogger);
        RegistrationServiceProvider regService = new RegistrationServiceProvider(bkServerConf, dlConf, rootStatsLogger.scope("registration").scope("provider"));
        RegistrationStateService regStateService = new RegistrationStateService(myEndpoint, bkServerConf, bkConf, regService, rootStatsLogger.scope("registration"));
        ClusterControllerService clusterControllerService = new ClusterControllerService(storageConf, () -> new ClusterControllerImpl(new ZkClusterMetadataStore(curatorProviderService.get(), ZKMetadataDriverBase.resolveZkServers(bkServerConf), "/stream"), regService.get(), new DefaultStorageContainerController(), new ZkClusterControllerLeaderSelector(curatorProviderService.get(), "/stream"), storageConf), rootStatsLogger.scope("cluster_controller"));
        return serverBuilder.addComponent(bkWatchService).addComponent(curatorProviderService).addComponent(dlNamespaceProvider).addComponent(storageService).addComponent(grpcService).addComponent(regService).addComponent(regStateService).addComponent(clusterControllerService).build();
    }

    private static BookieServiceInfo buildBookieServiceInfo(ComponentInfoPublisher componentInfoPublisher) {
        List<BookieServiceInfo.Endpoint> endpoints = componentInfoPublisher.getEndpoints().values().stream().map(e -> new BookieServiceInfo.Endpoint(e.getId(), e.getPort(), e.getHost(), e.getProtocol(), e.getAuth(), e.getExtensions())).collect(Collectors.toList());
        return new BookieServiceInfo(componentInfoPublisher.getProperties(), endpoints);
    }

    private static class ServerArguments {
        @Parameter(names={"-c", "--conf"}, description="Configuration file for storage server")
        private String serverConfigFile;
        @Parameter(names={"-p", "--port"}, description="Port to listen on for gPRC server")
        private int port = 4181;
        @Parameter(names={"-u", "--useHostname"}, description="Use hostname instead of IP for server ID")
        private boolean useHostname = false;
        @Parameter(names={"-h", "--help"}, description="Show this help message")
        private boolean help = false;

        private ServerArguments() {
        }
    }
}

