/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.embedded;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.NetworkUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedTimelineService {
    private static final Object SERVICE_LOCK = new Object();
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);
    private static final AtomicInteger NUM_SERVERS_RUNNING = new AtomicInteger(0);
    private static final Map<TimelineServiceIdentifier, EmbeddedTimelineService> RUNNING_SERVICES = new HashMap<TimelineServiceIdentifier, EmbeddedTimelineService>();
    private static final Registry METRICS_REGISTRY = Registry.getRegistry((String)"TimelineService");
    private static final String NUM_EMBEDDED_TIMELINE_SERVERS = "numEmbeddedTimelineServers";
    private int serverPort;
    private String hostAddr;
    private final HoodieEngineContext context;
    private final StorageConfiguration<?> storageConf;
    private final HoodieWriteConfig writeConfig;
    private TimelineService.Config serviceConfig;
    private final TimelineServiceIdentifier timelineServiceIdentifier;
    private final Set<String> basePaths;
    private transient FileSystemViewManager viewManager;
    private transient TimelineService server;

    private EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, TimelineServiceIdentifier timelineServiceIdentifier) {
        this.setHostAddr(embeddedTimelineServiceHostAddr);
        this.context = context;
        this.writeConfig = writeConfig;
        this.timelineServiceIdentifier = timelineServiceIdentifier;
        this.basePaths = new HashSet<String>();
        this.basePaths.add(writeConfig.getBasePath());
        this.storageConf = context.getStorageConf();
        this.viewManager = this.createViewManager();
    }

    public static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws IOException {
        return EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, TimelineServiceCreator timelineServiceCreator) throws IOException {
        TimelineServiceIdentifier timelineServiceIdentifier = EmbeddedTimelineService.getTimelineServiceIdentifier(embeddedTimelineServiceHostAddr, writeConfig);
        if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
            Object object = SERVICE_LOCK;
            synchronized (object) {
                if (RUNNING_SERVICES.containsKey(timelineServiceIdentifier)) {
                    RUNNING_SERVICES.get(timelineServiceIdentifier).addBasePath(writeConfig.getBasePath());
                    LOG.info("Reusing existing embedded timeline server with configuration: " + EmbeddedTimelineService.RUNNING_SERVICES.get((Object)timelineServiceIdentifier).serviceConfig);
                    return RUNNING_SERVICES.get(timelineServiceIdentifier);
                }
                EmbeddedTimelineService service = EmbeddedTimelineService.createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator, timelineServiceIdentifier);
                RUNNING_SERVICES.put(timelineServiceIdentifier, service);
                return service;
            }
        }
        return EmbeddedTimelineService.createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator, timelineServiceIdentifier);
    }

    private static EmbeddedTimelineService createAndStartService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, TimelineServiceCreator timelineServiceCreator, TimelineServiceIdentifier timelineServiceIdentifier) throws IOException {
        EmbeddedTimelineService service = new EmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceIdentifier);
        service.startServer(timelineServiceCreator);
        METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, (long)NUM_SERVERS_RUNNING.incrementAndGet());
        return service;
    }

    public static void shutdownAllTimelineServers() {
        RUNNING_SERVICES.entrySet().forEach(entry -> {
            LOG.info("Closing Timeline server");
            ((EmbeddedTimelineService)entry.getValue()).server.close();
            METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, (long)NUM_SERVERS_RUNNING.decrementAndGet());
            LOG.info("Closed Timeline server");
        });
        RUNNING_SERVICES.clear();
    }

    private FileSystemViewManager createViewManager() {
        FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().fromProperties((Properties)this.writeConfig.getClientSpecifiedViewStorageConfig().getProps());
        FileSystemViewStorageType storageType = builder.build().getStorageType();
        if (storageType.equals((Object)FileSystemViewStorageType.REMOTE_ONLY) || storageType.equals((Object)FileSystemViewStorageType.REMOTE_FIRST)) {
            builder.withStorageType(FileSystemViewStorageType.MEMORY);
        }
        return FileSystemViewManager.createViewManagerWithTableMetadata((HoodieEngineContext)this.context, (HoodieMetadataConfig)this.writeConfig.getMetadataConfig(), (FileSystemViewStorageConfig)builder.build(), (HoodieCommonConfig)this.writeConfig.getCommonConfig());
    }

    private void startServer(TimelineServiceCreator timelineServiceCreator) throws IOException {
        TimelineService.Config.Builder timelineServiceConfBuilder = TimelineService.Config.builder().serverPort(this.writeConfig.getEmbeddedTimelineServerPort()).numThreads(this.writeConfig.getEmbeddedTimelineServerThreads()).compress(this.writeConfig.getEmbeddedTimelineServerCompressOutput()).async(this.writeConfig.getEmbeddedTimelineServerUseAsync());
        if (this.writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {
            timelineServiceConfBuilder.enableMarkerRequests(true).markerBatchNumThreads(this.writeConfig.getMarkersTimelineServerBasedBatchNumThreads()).markerBatchIntervalMs(this.writeConfig.getMarkersTimelineServerBasedBatchIntervalMs()).markerParallelism(this.writeConfig.getMarkersDeleteParallelism());
        }
        if (this.writeConfig.isEarlyConflictDetectionEnable()) {
            timelineServiceConfBuilder.earlyConflictDetectionEnable(Boolean.valueOf(true)).earlyConflictDetectionStrategy(this.writeConfig.getEarlyConflictDetectionStrategyClassName()).earlyConflictDetectionCheckCommitConflict(Boolean.valueOf(this.writeConfig.earlyConflictDetectionCheckCommitConflict())).asyncConflictDetectorInitialDelayMs(this.writeConfig.getAsyncConflictDetectorInitialDelayMs()).asyncConflictDetectorPeriodMs(this.writeConfig.getAsyncConflictDetectorPeriodMs()).earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(Long.valueOf(this.writeConfig.getHoodieClientHeartbeatIntervalInMs() * (long)this.writeConfig.getHoodieClientHeartbeatTolerableMisses().intValue()));
        }
        this.serviceConfig = timelineServiceConfBuilder.build();
        this.server = timelineServiceCreator.create(this.context, (Configuration)this.storageConf.unwrapCopyAs(Configuration.class), this.serviceConfig, HoodieStorageUtils.getStorage((String)this.writeConfig.getBasePath(), (StorageConfiguration)this.storageConf.newInstance()), this.viewManager);
        this.serverPort = this.server.startService();
        LOG.info("Started embedded timeline server at " + this.hostAddr + ":" + this.serverPort);
    }

    private void setHostAddr(String embeddedTimelineServiceHostAddr) {
        if (embeddedTimelineServiceHostAddr != null) {
            LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
            this.hostAddr = embeddedTimelineServiceHostAddr;
        } else {
            LOG.warn("Unable to find driver bind address from spark config");
            this.hostAddr = NetworkUtils.getHostname();
        }
    }

    public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
        FileSystemViewStorageType viewStorageType = this.writeConfig.getClientSpecifiedViewStorageConfig().shouldEnableBackupForRemoteFileSystemView() ? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
        return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType).withRemoteServerHost(this.hostAddr).withRemoteServerPort(Integer.valueOf(this.serverPort)).withRemoteTimelineClientTimeoutSecs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs()).withRemoteTimelineClientRetry(this.writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled()).withRemoteTimelineClientMaxRetryNumbers(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers()).withRemoteTimelineInitialRetryIntervalMs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs()).withRemoteTimelineClientMaxRetryIntervalMs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs()).withRemoteTimelineClientRetryExceptions(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions()).build();
    }

    public FileSystemViewManager getViewManager() {
        return this.viewManager;
    }

    private void addBasePath(String basePath) {
        this.basePaths.add(basePath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopForBasePath(String basePath) {
        Object object = SERVICE_LOCK;
        synchronized (object) {
            this.basePaths.remove(basePath);
            if (this.basePaths.isEmpty()) {
                RUNNING_SERVICES.remove(this.timelineServiceIdentifier);
            }
        }
        if (this.server != null) {
            this.server.unregisterBasePath(basePath);
        }
        if (this.basePaths.isEmpty() && null != this.server) {
            LOG.info("Closing Timeline server");
            this.server.close();
            METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, (long)NUM_SERVERS_RUNNING.decrementAndGet());
            this.server = null;
            this.viewManager = null;
            LOG.info("Closed Timeline server");
        }
    }

    private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) {
        return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(), writeConfig.isEarlyConflictDetectionEnable());
    }

    static class TimelineServiceIdentifier {
        private final String hostAddr;
        private final MarkerType markerType;
        private final boolean isMetadataEnabled;
        private final boolean isEarlyConflictDetectionEnable;

        public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable) {
            this.hostAddr = hostAddr;
            this.markerType = markerType;
            this.isMetadataEnabled = isMetadataEnabled;
            this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof TimelineServiceIdentifier)) {
                return false;
            }
            TimelineServiceIdentifier that = (TimelineServiceIdentifier)o;
            if (this.hostAddr != null && that.hostAddr != null) {
                return this.isMetadataEnabled == that.isMetadataEnabled && this.isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable && this.hostAddr.equals(that.hostAddr) && this.markerType == that.markerType;
            }
            return this.hostAddr == null && that.hostAddr == null;
        }

        public int hashCode() {
            return Objects.hash(this.hostAddr, this.markerType, this.isMetadataEnabled, this.isEarlyConflictDetectionEnable);
        }
    }

    @FunctionalInterface
    static interface TimelineServiceCreator {
        public TimelineService create(HoodieEngineContext var1, Configuration var2, TimelineService.Config var3, HoodieStorage var4, FileSystemViewManager var5) throws IOException;
    }
}

