/*
 * Decompiled with CFR 0.152.
 */
package blobit.server;

import blobit.server.NetworkUtils;
import blobit.server.ServerConfiguration;
import blobit.server.ServletHttpServer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.component.Lifecycle;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.server.EmbeddedServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;

public class EmbeddedBookie
implements AutoCloseable {
    private static final Logger LOG = Logger.getLogger(EmbeddedBookie.class.getName());
    private final ServerConfiguration configuration;
    private final Path baseDirectory;
    private EmbeddedServer embeddedServer;
    private StatsProvider statsProvider;

    public EmbeddedBookie(Path baseDirectory, ServerConfiguration configuration) {
        this.configuration = configuration;
        this.baseDirectory = baseDirectory;
    }

    public void start() throws Exception {
        org.apache.bookkeeper.conf.ServerConfiguration conf = new org.apache.bookkeeper.conf.ServerConfiguration();
        conf.setZkTimeout(this.configuration.getInt("server.zookeeper.session.timeout", 40000));
        String zkServers = this.configuration.getString("server.zookeeper.address", "localhost:2181");
        String zkLedgersRootPath = this.configuration.getString("server.bookkeeper.zk.ledgers.root.path", "/ledgers");
        String metadataServiceUri = "zk+null://" + zkServers.replace(",", ";") + "" + zkLedgersRootPath;
        LOG.log(Level.INFO, "Embeeded Bookie will use metadataServiceUri: {0}", metadataServiceUri);
        conf.setMetadataServiceUri(metadataServiceUri);
        conf.setStatisticsEnabled(true);
        conf.setProperty("codahaleStatsJmxEndpoint", (Object)"BlobIt_Bookie");
        conf.setStatsProviderClass(PrometheusMetricsProvider.class);
        conf.setNumAddWorkerThreads(8);
        conf.setMaxPendingReadRequestPerThread(10000);
        conf.setMaxPendingAddRequestPerThread(20000);
        conf.setJournalSyncData(false);
        int port = this.configuration.getInt("server.bookkeeper.port", 3181);
        conf.setUseHostNameAsBookieID(true);
        Path bookie_dir = this.baseDirectory.resolve("bookie");
        if (port <= 0) {
            Integer _port = this.readLocalBookiePort(bookie_dir);
            if (_port == null) {
                _port = NetworkUtils.assignFirstFreePort();
                LOG.log(Level.SEVERE, "As configuration parameter server.bookkeeper.port is {0},I have choosen to listen on port {1}. Set to a positive number in order to use a fixed port", new Object[]{Integer.toString(port), Integer.toString(_port)});
                this.persistLocalBookiePort(bookie_dir, _port);
            }
            port = _port;
        }
        conf.setBookiePort(port);
        Files.createDirectories(bookie_dir, new FileAttribute[0]);
        Path bookie_data_dir = bookie_dir.resolve("bookie_data").toAbsolutePath();
        Path bookie_journal_dir = bookie_dir.resolve("bookie_journal").toAbsolutePath();
        Files.createDirectories(bookie_data_dir, new FileAttribute[0]);
        Files.createDirectories(bookie_journal_dir, new FileAttribute[0]);
        conf.setLedgerDirNames(new String[]{bookie_data_dir.toString()});
        conf.setJournalDirName(bookie_journal_dir.toString());
        conf.setFlushInterval(1000);
        conf.setMaxBackupJournals(5);
        conf.setMaxJournalSizeMB(1048L);
        conf.setEnableLocalTransport(true);
        conf.setProperty("journalMaxGroupWaitMSec", (Object)10L);
        conf.setJournalFlushWhenQueueEmpty(true);
        conf.setAutoRecoveryDaemonEnabled(true);
        conf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
        conf.setHttpServerEnabled(true);
        conf.setProperty("httpServerClass", (Object)ServletHttpServer.class.getName());
        conf.setProperty("prometheusStatsHttpEnable", (Object)"false");
        for (String key : this.configuration.keys()) {
            if (!key.startsWith("bookie.")) continue;
            String bookieConf = key.substring("bookie.".length());
            String value = this.configuration.getString(key, null);
            conf.addProperty(bookieConf, (Object)value);
            LOG.log(Level.CONFIG, "config {0} remapped to {1}={2}", new Object[]{key, bookieConf, value});
        }
        long _start = System.currentTimeMillis();
        LOG.severe("Booting Apache Bookkeeper on port " + port);
        Files.createDirectories(bookie_dir, new FileAttribute[0]);
        this.dumpBookieConfiguration(bookie_dir, conf);
        boolean forcemetaformat = this.configuration.getBoolean("bookie.forcemetaformat", false);
        LOG.log(Level.CONFIG, "bookie.forcemetaformat={0}", forcemetaformat);
        boolean result = BookKeeperAdmin.format((org.apache.bookkeeper.conf.ServerConfiguration)conf, (boolean)false, (boolean)forcemetaformat);
        if (result) {
            LOG.info("BookKeeperAdmin.format: created a new workspace on ZK");
        } else {
            LOG.info("BookKeeperAdmin.format: ZK space does not need an format operation");
        }
        boolean forceformat = this.configuration.getBoolean("bookie.forceformat", false);
        LOG.log(Level.CONFIG, "bookie.forceformat={0}", forceformat);
        if (forceformat) {
            result = BookieImpl.format((org.apache.bookkeeper.conf.ServerConfiguration)conf, (boolean)false, (boolean)forceformat);
            if (result) {
                LOG.info("Bookie.format: formatter applied to local bookie");
            } else {
                LOG.info("Bookie.format: local boookie did not need formatting");
            }
        }
        Class statsProviderClass = conf.getStatsProviderClass();
        this.statsProvider = (StatsProvider)ReflectionUtils.newInstance((Class)statsProviderClass);
        LOG.log(Level.INFO, "Bookie httpServerEnabled:{0}", conf.isHttpServerEnabled());
        BookieConfiguration bkConf = new BookieConfiguration(conf);
        this.embeddedServer = EmbeddedServer.builder((BookieConfiguration)bkConf).statsProvider(this.statsProvider).build();
        if (this.waitForBookieServiceState(Lifecycle.State.STARTED)) {
            LOG.info("bookie started");
        } else {
            LOG.warning("bookie start timed out");
        }
        long _stop = System.currentTimeMillis();
        LOG.severe("Booting Apache Bookkeeper finished. Time " + (_stop - _start) + " ms");
    }

    private void dumpBookieConfiguration(Path bookie_dir, org.apache.bookkeeper.conf.ServerConfiguration conf) throws IOException {
        Path actual_bookkeeper_configuration = bookie_dir.resolve("embedded.bookie.properties");
        StringBuilder builder = new StringBuilder();
        Iterator key_it = conf.getKeys();
        while (key_it.hasNext()) {
            String key = (String)key_it.next();
            Object value = conf.getProperty(key);
            if (value instanceof Collection) {
                value = ((Collection)value).stream().map(String::valueOf).collect(Collectors.joining(","));
            }
            builder.append(key + "=" + value + "\n");
        }
        Files.write(actual_bookkeeper_configuration, builder.toString().getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
        LOG.severe("Dumped actual Bookie configuration to " + actual_bookkeeper_configuration.toAbsolutePath());
    }

    @Override
    public void close() {
        if (this.embeddedServer != null) {
            LOG.info("Apache Bookkeeper stopping");
            try {
                if (this.waitForBookieServiceState(Lifecycle.State.STOPPED)) {
                    LOG.info("bookie stopped");
                } else {
                    LOG.warning("bookie stop timed out");
                }
            }
            catch (InterruptedException err) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.embeddedServer = null;
            }
        }
        if (this.statsProvider != null) {
            this.statsProvider.stop();
        }
    }

    public Integer readLocalBookiePort(Path dataPath) throws IOException {
        Path file = dataPath.resolve("bookie_port");
        try {
            LOG.log(Level.SEVERE, "Looking for local port into file {0}", file);
            if (!Files.isRegularFile(file, new LinkOption[0])) {
                LOG.log(Level.SEVERE, "Cannot find file {0}", file);
                return null;
            }
            List<String> lines = Files.readAllLines(file, StandardCharsets.UTF_8);
            for (String line : lines) {
                if (line.startsWith("#") || line.isEmpty()) continue;
                int res = Integer.parseInt(line);
                LOG.log(Level.SEVERE, "Found local port {0} into file {1}", new Object[]{Integer.toString(res), file});
                return res;
            }
            throw new IOException("Cannot find any valid line inside file " + file.toAbsolutePath());
        }
        catch (IOException error) {
            LOG.log(Level.SEVERE, "Error while reading file " + file.toAbsolutePath(), error);
            throw error;
        }
    }

    public void persistLocalBookiePort(Path dataPath, int port) throws IOException {
        Files.createDirectories(dataPath, new FileAttribute[0]);
        Path file = dataPath.resolve("bookie_port");
        StringBuilder message = new StringBuilder();
        message.append("# This file contains the port of the bookie used by this node\n");
        message.append("# Do not change the contents of this file, otherwise the beheaviour of the system will\n");
        message.append("# lead eventually to data loss\n");
        message.append("# \n");
        message.append("# Any line which starts with '#' and and blank line will be ignored\n");
        message.append("# The system will consider the first non-blank line as port\n");
        message.append("\n\n");
        message.append(port);
        Files.write(file, message.toString().getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE_NEW);
    }

    private boolean waitForBookieServiceState(Lifecycle.State expectedState) throws InterruptedException {
        for (int i = 0; i < 100; ++i) {
            Lifecycle.State currentState = this.embeddedServer.getBookieService().lifecycleState();
            if (currentState == expectedState) {
                return true;
            }
            Thread.sleep(500L);
        }
        return false;
    }
}

