/*
 * Decompiled with CFR 0.152.
 */
package io.trino.hdfs.rubix;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.MoreCollectors;
import com.google.common.io.Closer;
import com.qubole.rubix.bookkeeper.BookKeeper;
import com.qubole.rubix.bookkeeper.BookKeeperServer;
import com.qubole.rubix.bookkeeper.LocalDataTransferServer;
import com.qubole.rubix.common.metrics.MetricsReporterType;
import com.qubole.rubix.core.CachingFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAdlFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoGoogleHadoopFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoNativeAzureFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoSecureAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoSecureNativeAzureFileSystem;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.thrift.BookKeeperService;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import dev.failsafe.Policy;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import io.airlift.log.Logger;
import io.trino.hdfs.ConfigurationUtils;
import io.trino.hdfs.DynamicConfigurationProvider;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.rubix.CachingTrinoS3FileSystem;
import io.trino.hdfs.rubix.DummyBookKeeper;
import io.trino.hdfs.rubix.RubixConfig;
import io.trino.hdfs.rubix.RubixHdfsInitializer;
import io.trino.hdfs.rubix.TrinoClusterManager;
import io.trino.plugin.base.CatalogName;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.HostAddress;
import io.trino.spi.Node;
import io.trino.spi.NodeManager;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.hadoop.conf.Configuration;

public class RubixInitializer {
    private static final String RUBIX_S3_FS_CLASS_NAME = CachingTrinoS3FileSystem.class.getName();
    private static final String RUBIX_NATIVE_AZURE_FS_CLASS_NAME = CachingPrestoNativeAzureFileSystem.class.getName();
    private static final String RUBIX_SECURE_NATIVE_AZURE_FS_CLASS_NAME = CachingPrestoSecureNativeAzureFileSystem.class.getName();
    private static final String RUBIX_AZURE_BLOB_FS_CLASS_NAME = CachingPrestoAzureBlobFileSystem.class.getName();
    private static final String RUBIX_SECURE_AZURE_BLOB_FS_CLASS_NAME = CachingPrestoSecureAzureBlobFileSystem.class.getName();
    private static final String RUBIX_SECURE_ADL_CLASS_NAME = CachingPrestoAdlFileSystem.class.getName();
    private static final String RUBIX_GS_FS_CLASS_NAME = CachingPrestoGoogleHadoopFileSystem.class.getName();
    private static final String FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY = "presto.fs.owned.by.rubix";
    private static final FailsafeExecutor<?> DEFAULT_COORDINATOR_FAILSAFE_EXECUTOR = Failsafe.with((Policy)((RetryPolicyBuilder)RetryPolicy.builder().handle(TrinoException.class)).withMaxAttempts(-1).withMaxDuration(Duration.ofMinutes(10L)).withDelay(Duration.ofSeconds(1L)).build(), (Policy[])new RetryPolicy[0]);
    private static final Logger log = Logger.get(RubixInitializer.class);
    private final FailsafeExecutor<?> coordinatorFailsafeExecutor;
    private final boolean startServerOnCoordinator;
    private final boolean parallelWarmupEnabled;
    private final Optional<String> cacheLocation;
    private final long cacheTtlMillis;
    private final int diskUsagePercentage;
    private final int bookKeeperServerPort;
    private final int dataTransferServerPort;
    private final NodeManager nodeManager;
    private final CatalogName catalogName;
    private final HdfsConfigurationInitializer hdfsConfigurationInitializer;
    private final RubixHdfsInitializer rubixHdfsInitializer;
    private volatile boolean cacheReady;
    @Nullable
    private HostAddress masterAddress;
    @Nullable
    private BookKeeperServer bookKeeperServer;

    @Inject
    public RubixInitializer(RubixConfig rubixConfig, NodeManager nodeManager, CatalogName catalogName, HdfsConfigurationInitializer hdfsConfigurationInitializer, RubixHdfsInitializer rubixHdfsInitializer) {
        this(DEFAULT_COORDINATOR_FAILSAFE_EXECUTOR, rubixConfig, nodeManager, catalogName, hdfsConfigurationInitializer, rubixHdfsInitializer);
    }

    @VisibleForTesting
    RubixInitializer(FailsafeExecutor<?> coordinatorFailsafeExecutor, RubixConfig rubixConfig, NodeManager nodeManager, CatalogName catalogName, HdfsConfigurationInitializer hdfsConfigurationInitializer, RubixHdfsInitializer rubixHdfsInitializer) {
        this.coordinatorFailsafeExecutor = coordinatorFailsafeExecutor;
        this.startServerOnCoordinator = rubixConfig.isStartServerOnCoordinator();
        this.parallelWarmupEnabled = rubixConfig.getReadMode().isParallelWarmupEnabled();
        this.cacheLocation = rubixConfig.getCacheLocation();
        this.cacheTtlMillis = rubixConfig.getCacheTtl().toMillis();
        this.diskUsagePercentage = rubixConfig.getDiskUsagePercentage();
        this.bookKeeperServerPort = rubixConfig.getBookKeeperServerPort();
        this.dataTransferServerPort = rubixConfig.getDataTransferServerPort();
        this.nodeManager = nodeManager;
        this.catalogName = catalogName;
        this.hdfsConfigurationInitializer = hdfsConfigurationInitializer;
        this.rubixHdfsInitializer = rubixHdfsInitializer;
    }

    void initializeRubix() {
        if (this.nodeManager.getCurrentNode().isCoordinator() && !this.startServerOnCoordinator) {
            this.setupRubixMetrics();
            this.cacheReady = true;
            return;
        }
        if (this.cacheLocation.isEmpty()) {
            throw new IllegalArgumentException("caching directories were not provided");
        }
        this.waitForCoordinator();
        this.startRubix();
    }

    @PreDestroy
    public void stopRubix() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(() -> {
                if (this.bookKeeperServer != null) {
                    this.bookKeeperServer.stopServer();
                    this.bookKeeperServer = null;
                }
            });
            closer.register(LocalDataTransferServer::stopServer);
        }
    }

    public void enableRubix(Configuration configuration) {
        if (!this.cacheReady) {
            this.disableRubix(configuration);
            return;
        }
        this.updateRubixConfiguration(configuration, Owner.PRESTO);
        DynamicConfigurationProvider.setCacheKey(configuration, "rubix_enabled");
    }

    public void disableRubix(Configuration configuration) {
        CacheConfig.setCacheDataEnabled((Configuration)configuration, (boolean)false);
        DynamicConfigurationProvider.setCacheKey(configuration, "rubix_disabled");
    }

    public static Owner getConfigurationOwner(Configuration configuration) {
        if (configuration.get(FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY, "").equals("true")) {
            return Owner.RUBIX;
        }
        return Owner.PRESTO;
    }

    @VisibleForTesting
    boolean isServerUp() {
        return LocalDataTransferServer.isServerUp() && this.bookKeeperServer != null && this.bookKeeperServer.isServerUp();
    }

    private void waitForCoordinator() {
        this.coordinatorFailsafeExecutor.run(() -> {
            if (this.nodeManager.getAllNodes().stream().noneMatch(Node::isCoordinator)) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "No coordinator node available");
            }
        });
    }

    private void startRubix() {
        Configuration configuration = this.getRubixServerConfiguration();
        MetricRegistry metricRegistry = new MetricRegistry();
        this.bookKeeperServer = new BookKeeperServer();
        BookKeeper bookKeeper = this.bookKeeperServer.startServer(configuration, metricRegistry);
        LocalDataTransferServer.startServer((Configuration)configuration, (MetricRegistry)metricRegistry, (BookKeeper)bookKeeper);
        CachingFileSystem.setLocalBookKeeper((Configuration)configuration, (BookKeeperService.Iface)bookKeeper, (String)("catalog=" + this.catalogName));
        TrinoClusterManager.setNodeManager(this.nodeManager);
        log.info("Rubix initialized successfully");
        this.cacheReady = true;
    }

    private void setupRubixMetrics() {
        Configuration configuration = this.getRubixServerConfiguration();
        new BookKeeperServer().setupServer(configuration, new MetricRegistry());
        CachingFileSystem.setLocalBookKeeper((Configuration)configuration, (BookKeeperService.Iface)new DummyBookKeeper(), (String)("catalog=" + this.catalogName));
        TrinoClusterManager.setNodeManager(this.nodeManager);
    }

    private Configuration getRubixServerConfiguration() {
        Node master = (Node)this.nodeManager.getAllNodes().stream().filter(Node::isCoordinator).collect(MoreCollectors.onlyElement());
        this.masterAddress = master.getHostAndPort();
        Configuration configuration = ConfigurationUtils.getInitialConfiguration();
        this.hdfsConfigurationInitializer.initializeConfiguration(configuration);
        this.updateRubixConfiguration(configuration, Owner.RUBIX);
        DynamicConfigurationProvider.setCacheKey(configuration, "rubix_internal");
        return configuration;
    }

    private void updateRubixConfiguration(Configuration config, Owner owner) {
        Preconditions.checkState((this.masterAddress != null ? 1 : 0) != 0, (Object)"masterAddress is not set");
        CacheConfig.setCacheDataEnabled((Configuration)config, (boolean)true);
        CacheConfig.setOnMaster((Configuration)config, (boolean)this.nodeManager.getCurrentNode().isCoordinator());
        CacheConfig.setCoordinatorHostName((Configuration)config, (String)this.masterAddress.getHostText());
        CacheConfig.setIsParallelWarmupEnabled((Configuration)config, (boolean)this.parallelWarmupEnabled);
        CacheConfig.setCacheDataExpirationAfterWrite((Configuration)config, (long)this.cacheTtlMillis);
        CacheConfig.setCacheDataFullnessPercentage((Configuration)config, (int)this.diskUsagePercentage);
        CacheConfig.setBookKeeperServerPort((Configuration)config, (int)this.bookKeeperServerPort);
        CacheConfig.setDataTransferServerPort((Configuration)config, (int)this.dataTransferServerPort);
        CacheConfig.setMetricsReporters((Configuration)config, (String)MetricsReporterType.JMX.name());
        CacheConfig.setEmbeddedMode((Configuration)config, (boolean)true);
        CacheConfig.enableHeartbeat((Configuration)config, (boolean)false);
        CacheConfig.setClusterNodeRefreshTime((Configuration)config, (int)10);
        if (this.nodeManager.getCurrentNode().isCoordinator() && !this.startServerOnCoordinator) {
            CacheConfig.setCacheDataOnMasterEnabled((Configuration)config, (boolean)false);
        } else {
            CacheConfig.setCacheDataDirPrefix((Configuration)config, (String)this.cacheLocation.orElseThrow());
        }
        config.set("fs.s3.impl", RUBIX_S3_FS_CLASS_NAME);
        config.set("fs.s3a.impl", RUBIX_S3_FS_CLASS_NAME);
        config.set("fs.s3n.impl", RUBIX_S3_FS_CLASS_NAME);
        config.set("fs.wasb.impl", RUBIX_NATIVE_AZURE_FS_CLASS_NAME);
        config.set("fs.wasbs.impl", RUBIX_SECURE_NATIVE_AZURE_FS_CLASS_NAME);
        config.set("fs.abfs.impl", RUBIX_AZURE_BLOB_FS_CLASS_NAME);
        config.set("fs.abfss.impl", RUBIX_SECURE_AZURE_BLOB_FS_CLASS_NAME);
        config.set("fs.adl.impl", RUBIX_SECURE_ADL_CLASS_NAME);
        config.set("fs.gs.impl", RUBIX_GS_FS_CLASS_NAME);
        if (owner == Owner.RUBIX) {
            config.set(FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY, "true");
        }
        CacheConfig.setPrestoClusterManager((Configuration)config, (String)TrinoClusterManager.class.getName());
        this.rubixHdfsInitializer.initializeConfiguration(config);
    }

    public static enum Owner {
        PRESTO,
        RUBIX;

    }
}

