/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.timeline.service;

import io.javalin.Javalin;
import io.javalin.core.util.JettyServerUtil;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.org.eclipse.jetty.server.Server;
import org.apache.hudi.org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.apache.hudi.timeline.service.RequestHandler;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class TimelineService {
    private static final Logger LOG = LogManager.getLogger(TimelineService.class);
    private static final int START_SERVICE_MAX_RETRIES = 16;
    private static final int DEFAULT_NUM_THREADS = -1;
    private int serverPort;
    private Config timelineServerConf;
    private Configuration conf;
    private transient HoodieEngineContext context;
    private transient FileSystem fs;
    private transient Javalin app = null;
    private transient FileSystemViewManager fsViewsManager;
    private transient RequestHandler requestHandler;

    public int getServerPort() {
        return this.serverPort;
    }

    public TimelineService(HoodieEngineContext context, Configuration hadoopConf, Config timelineServerConf, FileSystem fileSystem, FileSystemViewManager globalFileSystemViewManager) throws IOException {
        this.conf = FSUtils.prepareHadoopConf(hadoopConf);
        this.timelineServerConf = timelineServerConf;
        this.serverPort = timelineServerConf.serverPort;
        this.context = context;
        this.fs = fileSystem;
        this.fsViewsManager = globalFileSystemViewManager;
    }

    private int startServiceOnPort(int port) throws IOException {
        if (port != 0 && (1024 > port || port >= 65536)) {
            throw new IllegalArgumentException(String.format("startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port. but now is %s.", port));
        }
        for (int attempt = 0; attempt < 16; ++attempt) {
            int tryPort = port == 0 ? port : (port + attempt - 1024) % 64512 + 1024;
            try {
                this.app.start(tryPort);
                return this.app.port();
            }
            catch (Exception e) {
                if (e.getMessage() != null && e.getMessage().contains("Failed to bind to")) {
                    if (tryPort == 0) {
                        LOG.warn((Object)"Timeline server could not bind on a random free port.");
                        continue;
                    }
                    LOG.warn((Object)String.format("Timeline server could not bind on port %d. Attempting port %d + 1.", tryPort, tryPort));
                    continue;
                }
                LOG.warn((Object)String.format("Timeline server start failed on port %d. Attempting port %d + 1.", tryPort, tryPort), (Throwable)e);
                continue;
            }
        }
        throw new IOException(String.format("Timeline server start failed on port %d, after retry %d times", port, 16));
    }

    public int startService() throws IOException {
        Server server = this.timelineServerConf.numThreads == -1 ? JettyServerUtil.defaultServer() : new Server(new QueuedThreadPool(this.timelineServerConf.numThreads));
        this.app = Javalin.create().server(() -> server);
        if (!this.timelineServerConf.compress) {
            this.app.disableDynamicGzip();
        }
        this.requestHandler = new RequestHandler(this.app, this.conf, this.timelineServerConf, this.context, this.fs, this.fsViewsManager);
        this.app.get("/", ctx -> ctx.result("Hello Hudi"));
        this.requestHandler.register();
        int realServerPort = this.startServiceOnPort(this.serverPort);
        LOG.info((Object)("Starting Timeline server on port :" + realServerPort));
        this.serverPort = realServerPort;
        return realServerPort;
    }

    public void run() throws IOException {
        this.startService();
    }

    public static FileSystemViewManager buildFileSystemViewManager(Config config, SerializableConfiguration conf) {
        HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get());
        HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
        HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
        switch (config.viewStorageType) {
            case MEMORY: {
                FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder();
                inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY);
                return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build(), commonConfig);
            }
            case SPILLABLE_DISK: {
                FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder();
                spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withBaseStoreDir(config.baseStorePathForFileGroups).withMaxMemoryForView((long)(config.maxViewMemPerTableInMB * 1024) * 1024L).withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
                return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build(), commonConfig);
            }
            case EMBEDDED_KV_STORE: {
                FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder();
                rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).withRocksDBPath(config.rocksDBPath);
                return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build(), commonConfig);
            }
        }
        throw new IllegalArgumentException("Invalid view manager storage type :" + (Object)((Object)config.viewStorageType));
    }

    public void close() {
        LOG.info((Object)"Closing Timeline Service");
        if (this.requestHandler != null) {
            this.requestHandler.stop();
        }
        if (this.app != null) {
            this.app.stop();
            this.app = null;
        }
        this.fsViewsManager.close();
        LOG.info((Object)"Closed Timeline Service");
    }

    public Configuration getConf() {
        return this.conf;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public static void main(String[] args) throws Exception {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue()) {
            cmd.usage();
            System.exit(1);
        }
        Configuration conf = FSUtils.prepareHadoopConf(new Configuration());
        FileSystemViewManager viewManager = TimelineService.buildFileSystemViewManager(cfg, new SerializableConfiguration(conf));
        TimelineService service2 = new TimelineService(new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())), new Configuration(), cfg, FileSystem.get((Configuration)new Configuration()), viewManager);
        service2.run();
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--server-port", "-p"}, description=" Server Port")
        public Integer serverPort = 26754;
        @Parameter(names={"--view-storage", "-st"}, description="View Storage Type. Default - SPILLABLE_DISK")
        public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
        @Parameter(names={"--max-view-mem-per-table", "-mv"}, description="Maximum view memory per table in MB to be used for storing file-groups. Overflow file-groups will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Integer maxViewMemPerTableInMB = 2048;
        @Parameter(names={"--mem-overhead-fraction-pending-compaction", "-cf"}, description="Memory Fraction of --max-view-mem-per-table to be allocated for managing pending compaction storage. Overflow entries will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Double memFractionForCompactionPerTable = 0.001;
        @Parameter(names={"--base-store-path", "-sp"}, description="Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
        public String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();
        @Parameter(names={"--rocksdb-path", "-rp"}, description="Root directory for RocksDB")
        public String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();
        @Parameter(names={"--threads", "-t"}, description="Number of threads to use for serving requests")
        public int numThreads = -1;
        @Parameter(names={"--async"}, description="Use asyncronous request processing")
        public boolean async = false;
        @Parameter(names={"--compress"}, description="Compress output using gzip")
        public boolean compress = true;
        @Parameter(names={"--enable-marker-requests", "-em"}, description="Enable handling of marker-related requests")
        public boolean enableMarkerRequests = false;
        @Parameter(names={"--marker-batch-threads", "-mbt"}, description="Number of threads to use for batch processing marker creation requests")
        public int markerBatchNumThreads = 20;
        @Parameter(names={"--marker-batch-interval-ms", "-mbi"}, description="The interval in milliseconds between two batch processing of marker creation requests")
        public long markerBatchIntervalMs = 50L;
        @Parameter(names={"--marker-parallelism", "-mdp"}, description="Parallelism to use for reading and deleting marker files")
        public int markerParallelism = 100;
        @Parameter(names={"--refreshTimelineBasedOnLatestCommit"}, description="Refresh local timeline based on latest commit in addition to timeline hash value")
        public boolean refreshTimelineBasedOnLatestCommit = true;
        @Parameter(names={"--help", "-h"})
        public Boolean help = false;

        public static Builder builder() {
            return new Builder();
        }

        public static class Builder {
            private Integer serverPort = 26754;
            private FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
            private Integer maxViewMemPerTableInMB = 2048;
            private Double memFractionForCompactionPerTable = 0.001;
            private String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();
            private String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();
            private int numThreads = -1;
            private boolean async = false;
            private boolean compress = true;
            private boolean enableMarkerRequests = false;
            private int markerBatchNumThreads = 20;
            private long markerBatchIntervalMs = 50L;
            private int markerParallelism = 100;
            private boolean refreshTimelineBasedOnLatestCommit = false;

            public Builder serverPort(int serverPort) {
                this.serverPort = serverPort;
                return this;
            }

            public Builder viewStorageType(FileSystemViewStorageType viewStorageType) {
                this.viewStorageType = viewStorageType;
                return this;
            }

            public Builder maxViewMemPerTableInMB(int maxViewMemPerTableInMB) {
                this.maxViewMemPerTableInMB = maxViewMemPerTableInMB;
                return this;
            }

            public Builder memFractionForCompactionPerTable(double memFractionForCompactionPerTable) {
                this.memFractionForCompactionPerTable = memFractionForCompactionPerTable;
                return this;
            }

            public Builder baseStorePathForFileGroups(String baseStorePathForFileGroups) {
                this.baseStorePathForFileGroups = baseStorePathForFileGroups;
                return this;
            }

            public Builder rocksDBPath(String rocksDBPath) {
                this.rocksDBPath = rocksDBPath;
                return this;
            }

            public Builder numThreads(int numThreads) {
                this.numThreads = numThreads;
                return this;
            }

            public Builder async(boolean async) {
                this.async = async;
                return this;
            }

            public Builder compress(boolean compress) {
                this.compress = compress;
                return this;
            }

            public Builder refreshTimelineBasedOnLatestCommit(boolean refreshTimelineBasedOnLatestCommit) {
                this.refreshTimelineBasedOnLatestCommit = refreshTimelineBasedOnLatestCommit;
                return this;
            }

            public Builder enableMarkerRequests(boolean enableMarkerRequests) {
                this.enableMarkerRequests = enableMarkerRequests;
                return this;
            }

            public Builder markerBatchNumThreads(int markerBatchNumThreads) {
                this.markerBatchNumThreads = markerBatchNumThreads;
                return this;
            }

            public Builder markerBatchIntervalMs(long markerBatchIntervalMs) {
                this.markerBatchIntervalMs = markerBatchIntervalMs;
                return this;
            }

            public Builder markerParallelism(int markerParallelism) {
                this.markerParallelism = markerParallelism;
                return this;
            }

            public Config build() {
                Config config = new Config();
                config.serverPort = this.serverPort;
                config.viewStorageType = this.viewStorageType;
                config.maxViewMemPerTableInMB = this.maxViewMemPerTableInMB;
                config.memFractionForCompactionPerTable = this.memFractionForCompactionPerTable;
                config.baseStorePathForFileGroups = this.baseStorePathForFileGroups;
                config.rocksDBPath = this.rocksDBPath;
                config.numThreads = this.numThreads;
                config.async = this.async;
                config.compress = this.compress;
                config.enableMarkerRequests = this.enableMarkerRequests;
                config.markerBatchNumThreads = this.markerBatchNumThreads;
                config.markerBatchIntervalMs = this.markerBatchIntervalMs;
                config.markerParallelism = this.markerParallelism;
                return config;
            }
        }
    }
}

