/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.FilteringSegmentCallback;
import org.apache.druid.client.HttpServerInventoryViewConfig;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;

public class HttpServerInventoryView
implements ServerInventoryView,
FilteredServerInventoryView {
    public static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> SEGMENT_LIST_RESP_TYPE_REF = new TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>>(){};
    private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
    private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ConcurrentMap<ServerView.ServerRemovedCallback, Executor> serverCallbacks = new ConcurrentHashMap<ServerView.ServerRemovedCallback, Executor>();
    private final ConcurrentMap<ServerView.SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<ServerView.SegmentCallback, Executor>();
    private final ConcurrentMap<ServerView.SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new ConcurrentHashMap<ServerView.SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>>();
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
    private volatile Predicate<Pair<DruidServerMetadata, DataSegment>> finalPredicate;
    private final ConcurrentHashMap<String, DruidServerHolder> servers = new ConcurrentHashMap();
    private final String execNamePrefix;
    private final ScheduledExecutorFactory executorFactory;
    private volatile ScheduledExecutorService inventorySyncExecutor;
    private volatile ScheduledExecutorService monitoringExecutor;
    private final HttpClient httpClient;
    private final ObjectMapper smileMapper;
    private final HttpServerInventoryViewConfig config;
    private final ServiceEmitter serviceEmitter;

    public HttpServerInventoryView(ObjectMapper smileMapper, HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter, HttpServerInventoryViewConfig config, ServiceEmitter serviceEmitter, ScheduledExecutorFactory executorFactory, String execNamePrefix) {
        this.httpClient = httpClient;
        this.smileMapper = smileMapper;
        this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
        this.defaultFilter = defaultFilter;
        this.finalPredicate = defaultFilter;
        this.config = config;
        this.serviceEmitter = serviceEmitter;
        this.executorFactory = executorFactory;
        this.execNamePrefix = execNamePrefix;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() {
        LifecycleLock lifecycleLock = this.lifecycleLock;
        synchronized (lifecycleLock) {
            if (!this.lifecycleLock.canStart()) {
                throw new ISE("Could not start lifecycle", new Object[0]);
            }
            this.log.info("Starting executor[%s].", new Object[]{this.execNamePrefix});
            try {
                this.inventorySyncExecutor = this.executorFactory.create(this.config.getNumThreads(), this.execNamePrefix + "-%s");
                this.monitoringExecutor = this.executorFactory.create(1, this.execNamePrefix + "-monitor-%s");
                DruidNodeDiscovery druidNodeDiscovery = this.druidNodeDiscoveryProvider.getForService("dataNodeService");
                druidNodeDiscovery.registerListener(new DruidNodeDiscovery.Listener(){
                    private final AtomicBoolean initialized = new AtomicBoolean(false);

                    @Override
                    public void nodesAdded(Collection<DiscoveryDruidNode> nodes) {
                        nodes.forEach(node -> HttpServerInventoryView.this.serverAdded(this.toDruidServer((DiscoveryDruidNode)node)));
                    }

                    @Override
                    public void nodesRemoved(Collection<DiscoveryDruidNode> nodes) {
                        nodes.forEach(node -> HttpServerInventoryView.this.serverRemoved(this.toDruidServer((DiscoveryDruidNode)node)));
                    }

                    @Override
                    public void nodeViewInitialized() {
                        if (!this.initialized.getAndSet(true)) {
                            HttpServerInventoryView.this.inventorySyncExecutor.execute(() -> HttpServerInventoryView.this.serverInventoryInitialized());
                        }
                    }

                    private DruidServer toDruidServer(DiscoveryDruidNode node) {
                        DruidNode druidNode = node.getDruidNode();
                        DataNodeService dataNodeService = node.getService("dataNodeService", DataNodeService.class);
                        if (dataNodeService == null) {
                            return new DruidServer(druidNode.getHostAndPortToUse(), druidNode.getHostAndPort(), druidNode.getHostAndTlsPort(), 0L, ServerType.fromNodeRole(node.getNodeRole()), "_default_tier", 0);
                        }
                        return new DruidServer(druidNode.getHostAndPortToUse(), druidNode.getHostAndPort(), druidNode.getHostAndTlsPort(), dataNodeService.getMaxSize(), dataNodeService.getServerType(), dataNodeService.getTier(), dataNodeService.getPriority());
                    }
                });
                ScheduledExecutors.scheduleWithFixedDelay((ScheduledExecutorService)this.monitoringExecutor, (Duration)Duration.standardSeconds((long)60L), (Duration)Duration.standardMinutes((long)5L), this::checkAndResetUnhealthyServers);
                ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.monitoringExecutor, (Duration)Duration.standardSeconds((long)30L), (Duration)Duration.standardMinutes((long)1L), this::emitServerStatusMetrics);
                this.lifecycleLock.started();
            }
            finally {
                this.lifecycleLock.exitStart();
            }
            this.log.info("Started executor[%s].", new Object[]{this.execNamePrefix});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() {
        LifecycleLock lifecycleLock = this.lifecycleLock;
        synchronized (lifecycleLock) {
            if (!this.lifecycleLock.canStop()) {
                throw new ISE("can't stop.", new Object[0]);
            }
            this.log.info("Stopping executor[%s].", new Object[]{this.execNamePrefix});
            if (this.inventorySyncExecutor != null) {
                this.inventorySyncExecutor.shutdownNow();
            }
            if (this.monitoringExecutor != null) {
                this.monitoringExecutor.shutdownNow();
            }
            this.log.info("Stopped executor[%s].", new Object[]{this.execNamePrefix});
        }
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter) {
        if (this.lifecycleLock.isStarted()) {
            throw new ISE("Lifecycle has already started.", new Object[0]);
        }
        FilteringSegmentCallback filteringSegmentCallback = new FilteringSegmentCallback(callback, filter);
        this.segmentCallbacks.put(filteringSegmentCallback, exec);
        this.segmentPredicates.put(filteringSegmentCallback, filter);
        this.updateFinalPredicate();
    }

    @Override
    public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) {
        if (this.lifecycleLock.isStarted()) {
            throw new ISE("Lifecycle has already started.", new Object[0]);
        }
        this.serverCallbacks.put(callback, exec);
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
        if (this.lifecycleLock.isStarted()) {
            throw new ISE("Lifecycle has already started.", new Object[0]);
        }
        this.segmentCallbacks.put(callback, exec);
    }

    @Override
    public DruidServer getInventoryValue(String containerKey) {
        DruidServerHolder holder = this.servers.get(containerKey);
        if (holder != null) {
            return holder.druidServer;
        }
        return null;
    }

    @Override
    public Collection<DruidServer> getInventory() {
        return Collections2.transform(this.servers.values(), serverHolder -> ((DruidServerHolder)serverHolder).druidServer);
    }

    private void runSegmentCallbacks(final Function<ServerView.SegmentCallback, ServerView.CallbackAction> fn) {
        for (final Map.Entry entry : this.segmentCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(new Runnable(){

                @Override
                public void run() {
                    if (ServerView.CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
                        HttpServerInventoryView.this.segmentCallbacks.remove(entry.getKey());
                        if (HttpServerInventoryView.this.segmentPredicates.remove(entry.getKey()) != null) {
                            HttpServerInventoryView.this.updateFinalPredicate();
                        }
                    }
                }
            });
        }
    }

    private void runServerRemovedCallbacks(final DruidServer server) {
        for (final Map.Entry entry : this.serverCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(new Runnable(){

                @Override
                public void run() {
                    if (ServerView.CallbackAction.UNREGISTER == ((ServerView.ServerRemovedCallback)entry.getKey()).serverRemoved(server)) {
                        HttpServerInventoryView.this.serverCallbacks.remove(entry.getKey());
                    }
                }
            });
        }
    }

    private void serverInventoryInitialized() {
        long start = System.currentTimeMillis();
        long serverSyncWaitTimeout = this.config.getServerTimeout() + 10000L;
        ArrayList<DruidServerHolder> uninitializedServers = new ArrayList<DruidServerHolder>();
        for (DruidServerHolder server : this.servers.values()) {
            if (server.isSyncedSuccessfullyAtleastOnce()) continue;
            uninitializedServers.add(server);
        }
        while (!uninitializedServers.isEmpty() && System.currentTimeMillis() - start < serverSyncWaitTimeout) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException ex) {
                throw new RE((Throwable)ex, "Interrupted while waiting for queryable server initial successful sync.", new Object[0]);
            }
            this.log.info("Waiting for [%d] servers to sync successfully.", new Object[]{uninitializedServers.size()});
            uninitializedServers.removeIf(serverHolder -> serverHolder.isSyncedSuccessfullyAtleastOnce() || serverHolder.isStopped());
        }
        if (uninitializedServers.isEmpty()) {
            this.log.info("All servers have been synced successfully at least once.", new Object[0]);
        } else {
            for (DruidServerHolder server : uninitializedServers) {
                this.log.warn("Server[%s] might not yet be synced successfully. We will continue to retry that in the background.", new Object[]{server.druidServer.getName()});
            }
        }
        this.log.info("Invoking segment view initialized callbacks.", new Object[0]);
        this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)ServerView.SegmentCallback::segmentViewInitialized));
    }

    private void updateFinalPredicate() {
        this.finalPredicate = Predicates.or(this.defaultFilter, (Predicate)Predicates.or(this.segmentPredicates.values()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void serverAdded(DruidServer server) {
        ConcurrentHashMap<String, DruidServerHolder> concurrentHashMap = this.servers;
        synchronized (concurrentHashMap) {
            DruidServerHolder holder = this.servers.get(server.getName());
            if (holder == null) {
                this.log.info("Server[%s] appeared.", new Object[]{server.getName()});
                holder = new DruidServerHolder(server);
                this.servers.put(server.getName(), holder);
                holder.start();
            } else {
                this.log.info("Server[%s] already exists.", new Object[]{server.getName()});
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverRemoved(DruidServer server) {
        ConcurrentHashMap<String, DruidServerHolder> concurrentHashMap = this.servers;
        synchronized (concurrentHashMap) {
            DruidServerHolder holder = this.servers.remove(server.getName());
            if (holder != null) {
                this.log.info("Server[%s] disappeared.", new Object[]{server.getName()});
                holder.stop();
                this.runServerRemovedCallbacks(holder.druidServer);
            } else {
                this.log.info("Ignoring remove notification for unknown server[%s].", new Object[]{server.getName()});
            }
        }
    }

    public Map<String, Object> getDebugInfo() {
        Preconditions.checkArgument((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS));
        HashMap result = Maps.newHashMapWithExpectedSize((int)this.servers.size());
        for (Map.Entry<String, DruidServerHolder> e : this.servers.entrySet()) {
            DruidServerHolder serverHolder = e.getValue();
            result.put(e.getKey(), serverHolder.syncer.getDebugInfo());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void checkAndResetUnhealthyServers() {
        ImmutableSet serverEntrySet = ImmutableSet.copyOf(this.servers.entrySet());
        for (Map.Entry e : serverEntrySet) {
            DruidServerHolder serverHolder = (DruidServerHolder)e.getValue();
            if (!serverHolder.syncer.needsReset()) continue;
            ConcurrentHashMap<String, DruidServerHolder> concurrentHashMap = this.servers;
            synchronized (concurrentHashMap) {
                if (this.servers.containsKey(e.getKey())) {
                    this.log.warn("Resetting server[%s] with state[%s] as it is not syncing properly.", new Object[]{serverHolder.druidServer.getName(), serverHolder.syncer.getDebugInfo()});
                    this.serverRemoved(serverHolder.druidServer);
                    this.serverAdded(serverHolder.druidServer.copyWithoutSegments());
                }
            }
        }
    }

    private void emitServerStatusMetrics() {
        ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent.builder();
        try {
            ImmutableMap serversCopy = ImmutableMap.copyOf(this.servers);
            serversCopy.forEach((serverName, serverHolder) -> {
                DruidServer server = ((DruidServerHolder)serverHolder).druidServer;
                eventBuilder.setDimension("tier", (Object)server.getTier());
                eventBuilder.setDimension("server", serverName);
                boolean isSynced = ((DruidServerHolder)serverHolder).syncer.isSyncedSuccessfully();
                this.serviceEmitter.emit(eventBuilder.build("serverview/sync/healthy", (Number)(isSynced ? 1 : 0)));
                long unstableTimeMillis = ((DruidServerHolder)serverHolder).syncer.getUnstableTimeMillis();
                if (unstableTimeMillis > 0L) {
                    this.serviceEmitter.emit(eventBuilder.build("serverview/sync/unstableTime", (Number)unstableTimeMillis));
                }
            });
        }
        catch (Exception e) {
            this.log.error((Throwable)e, "Error while emitting server status metrics", new Object[0]);
        }
    }

    @Override
    public boolean isStarted() {
        return this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) {
        DruidServerHolder holder = this.servers.get(serverKey);
        return holder != null && holder.druidServer.getSegment(segment.getId()) != null;
    }

    private class DruidServerHolder {
        private final DruidServer druidServer;
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        private final ChangeRequestHttpSyncer<DataSegmentChangeRequest> syncer;

        DruidServerHolder(DruidServer druidServer) {
            this.druidServer = druidServer;
            try {
                HostAndPort hostAndPort = HostAndPort.fromString((String)druidServer.getHost());
                this.syncer = new ChangeRequestHttpSyncer<DataSegmentChangeRequest>(HttpServerInventoryView.this.smileMapper, HttpServerInventoryView.this.httpClient, HttpServerInventoryView.this.inventorySyncExecutor, new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"), "/druid-internal/v1/segments", SEGMENT_LIST_RESP_TYPE_REF, HttpServerInventoryView.this.config.getServerTimeout(), HttpServerInventoryView.this.config.getServerUnstabilityTimeout(), this.createSyncListener());
            }
            catch (MalformedURLException ex) {
                throw new IAE((Throwable)ex, "Failed to construct server URL.", new Object[0]);
            }
        }

        void start() {
            this.syncer.start();
        }

        void stop() {
            this.syncer.stop();
            this.stopped.set(true);
        }

        boolean isStopped() {
            return this.stopped.get();
        }

        boolean isSyncedSuccessfullyAtleastOnce() {
            return this.syncer.isInitialized();
        }

        private ChangeRequestHttpSyncer.Listener<DataSegmentChangeRequest> createSyncListener() {
            return new ChangeRequestHttpSyncer.Listener<DataSegmentChangeRequest>(){

                @Override
                public void fullSync(List<DataSegmentChangeRequest> changes) {
                    HashMap toRemove = Maps.newHashMapWithExpectedSize((int)DruidServerHolder.this.druidServer.getTotalSegments());
                    DruidServerHolder.this.druidServer.iterateAllSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
                    for (DataSegmentChangeRequest request : changes) {
                        if (request instanceof SegmentChangeRequestLoad) {
                            DataSegment segment2 = ((SegmentChangeRequestLoad)request).getSegment();
                            toRemove.remove(segment2.getId());
                            DruidServerHolder.this.addSegment(segment2, true);
                            continue;
                        }
                        HttpServerInventoryView.this.log.error("Server[%s] gave a non-load dataSegmentChangeRequest[%s]., Ignored.", new Object[]{DruidServerHolder.this.druidServer.getName(), request});
                    }
                    for (DataSegment segmentToRemove : toRemove.values()) {
                        DruidServerHolder.this.removeSegment(segmentToRemove, true);
                    }
                }

                @Override
                public void deltaSync(List<DataSegmentChangeRequest> changes) {
                    for (DataSegmentChangeRequest request : changes) {
                        if (request instanceof SegmentChangeRequestLoad) {
                            DruidServerHolder.this.addSegment(((SegmentChangeRequestLoad)request).getSegment(), false);
                            continue;
                        }
                        if (request instanceof SegmentChangeRequestDrop) {
                            DruidServerHolder.this.removeSegment(((SegmentChangeRequestDrop)request).getSegment(), false);
                            continue;
                        }
                        HttpServerInventoryView.this.log.error("Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.", new Object[]{DruidServerHolder.this.druidServer.getName(), request});
                    }
                }
            };
        }

        private void addSegment(DataSegment segment, boolean fullSync) {
            if (HttpServerInventoryView.this.finalPredicate.apply((Object)Pair.of((Object)this.druidServer.getMetadata(), (Object)segment))) {
                if (this.druidServer.getSegment(segment.getId()) == null) {
                    final DataSegment theSegment = DataSegmentInterner.intern(segment);
                    this.druidServer.addDataSegment(theSegment);
                    HttpServerInventoryView.this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)new Function<ServerView.SegmentCallback, ServerView.CallbackAction>(){

                        public ServerView.CallbackAction apply(ServerView.SegmentCallback input) {
                            return input.segmentAdded(DruidServerHolder.this.druidServer.getMetadata(), theSegment);
                        }
                    }));
                } else if (!fullSync) {
                    HttpServerInventoryView.this.log.warn("Not adding or running callbacks for existing segment[%s] on server[%s]", new Object[]{segment.getId(), this.druidServer.getName()});
                }
            }
        }

        private void removeSegment(final DataSegment segment, boolean fullSync) {
            if (this.druidServer.removeDataSegment(segment.getId()) != null) {
                HttpServerInventoryView.this.runSegmentCallbacks((Function<ServerView.SegmentCallback, ServerView.CallbackAction>)((Function)new Function<ServerView.SegmentCallback, ServerView.CallbackAction>(){

                    public ServerView.CallbackAction apply(ServerView.SegmentCallback input) {
                        return input.segmentRemoved(DruidServerHolder.this.druidServer.getMetadata(), segment);
                    }
                }));
            } else if (!fullSync) {
                HttpServerInventoryView.this.log.warn("Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", new Object[]{segment.getId(), this.druidServer.getName()});
            }
        }
    }
}

