/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.google.common.base.Predicate;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DirectDruidClientFactory;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;

@ManageLifecycle
public class BrokerServerView
implements TimelineServerView {
    private static final Logger log = new Logger(BrokerServerView.class);
    private final Object lock = new Object();
    private final ConcurrentMap<String, QueryableDruidServer> clients = new ConcurrentHashMap<String, QueryableDruidServer>();
    private final Map<SegmentId, ServerSelector> selectors = new HashMap<SegmentId, ServerSelector>();
    private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<String, VersionedIntervalTimeline<String, ServerSelector>>();
    private final ConcurrentMap<TimelineServerView.TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<TimelineServerView.TimelineCallback, Executor>();
    private final DirectDruidClientFactory druidClientFactory;
    private final TierSelectorStrategy tierSelectorStrategy;
    private final ServiceEmitter emitter;
    private final BrokerSegmentWatcherConfig segmentWatcherConfig;
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
    private final CountDownLatch initialized = new CountDownLatch(1);
    private final FilteredServerInventoryView baseView;

    @Inject
    public BrokerServerView(DirectDruidClientFactory directDruidClientFactory, FilteredServerInventoryView baseView, TierSelectorStrategy tierSelectorStrategy, ServiceEmitter emitter, BrokerSegmentWatcherConfig segmentWatcherConfig) {
        this.druidClientFactory = directDruidClientFactory;
        this.baseView = baseView;
        this.tierSelectorStrategy = tierSelectorStrategy;
        this.emitter = emitter;
        this.validateSegmentWatcherConfig(segmentWatcherConfig);
        this.segmentWatcherConfig = segmentWatcherConfig;
        this.segmentFilter = metadataAndSegment -> {
            if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(((DruidServerMetadata)metadataAndSegment.lhs).getTier())) {
                return false;
            }
            if (segmentWatcherConfig.getIgnoredTiers() != null && segmentWatcherConfig.getIgnoredTiers().contains(((DruidServerMetadata)metadataAndSegment.lhs).getTier())) {
                return false;
            }
            if (segmentWatcherConfig.getWatchedDataSources() != null && !segmentWatcherConfig.getWatchedDataSources().contains(((DataSegment)metadataAndSegment.rhs).getDataSource())) {
                return false;
            }
            return ((DruidServerMetadata)metadataAndSegment.lhs).getType() != ServerType.INDEXER_EXECUTOR || segmentWatcherConfig.isWatchRealtimeTasks();
        };
        ExecutorService exec = Execs.singleThreaded((String)"BrokerServerView-%s");
        baseView.registerSegmentCallback(exec, new ServerView.SegmentCallback(){

            @Override
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                BrokerServerView.this.serverAddedSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                BrokerServerView.this.serverRemovedSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentViewInitialized() {
                BrokerServerView.this.initialized.countDown();
                BrokerServerView.this.runTimelineCallbacks(TimelineServerView.TimelineCallback::timelineInitialized);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        }, this.segmentFilter);
        baseView.registerServerRemovedCallback(exec, server -> {
            this.removeServer(server);
            return ServerView.CallbackAction.CONTINUE;
        });
    }

    @LifecycleStart
    public void start() throws InterruptedException {
        if (this.segmentWatcherConfig.isAwaitInitializationOnStart()) {
            long startMillis = System.currentTimeMillis();
            log.info("BrokerServerView waiting for initialization.", new Object[0]);
            this.awaitInitialization();
            long endMillis = System.currentTimeMillis();
            log.info("BrokerServerView initialized in [%,d] ms.", new Object[]{endMillis - startMillis});
            this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setMetric("serverview/init/time", (Number)(endMillis - startMillis)));
        }
    }

    public boolean isInitialized() {
        return this.initialized.getCount() == 0L;
    }

    public void awaitInitialization() throws InterruptedException {
        this.initialized.await();
    }

    private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig watcherConfig) {
        if (watcherConfig.getWatchedTiers() != null && watcherConfig.getIgnoredTiers() != null) {
            throw new ISE("At most one of 'druid.broker.segment.watchedTiers' and 'druid.broker.segment.ignoredTiers' can be configured.", new Object[0]);
        }
        if (watcherConfig.getWatchedTiers() != null && watcherConfig.getWatchedTiers().isEmpty()) {
            throw new ISE("If configured, 'druid.broker.segment.watchedTiers' must be non-empty", new Object[0]);
        }
        if (watcherConfig.getIgnoredTiers() != null && watcherConfig.getIgnoredTiers().isEmpty()) {
            throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty", new Object[0]);
        }
    }

    private QueryableDruidServer addServer(DruidServer server) {
        QueryableDruidServer<DirectDruidClient> retVal = new QueryableDruidServer<DirectDruidClient>(server, this.druidClientFactory.makeDirectClient(server));
        QueryableDruidServer<DirectDruidClient> exists = this.clients.put(server.getName(), retVal);
        if (exists != null) {
            log.warn("QueryRunner for server[%s] already exists!? Well it's getting replaced", new Object[]{server});
        }
        return retVal;
    }

    private QueryableDruidServer removeServer(DruidServer server) {
        for (DataSegment segment : server.iterateAllSegments()) {
            this.serverRemovedSegment(server.getMetadata(), segment);
        }
        return (QueryableDruidServer)this.clients.remove(server.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverAddedSegment(DruidServerMetadata server, DataSegment segment) {
        SegmentId segmentId = segment.getId();
        Object object = this.lock;
        synchronized (object) {
            if (!server.getType().equals((Object)ServerType.BROKER)) {
                QueryableDruidServer queryableDruidServer;
                log.debug("Adding segment[%s] for server[%s]", new Object[]{segment, server});
                ServerSelector selector = this.selectors.get(segmentId);
                if (selector == null) {
                    selector = new ServerSelector(segment, this.tierSelectorStrategy);
                    VersionedIntervalTimeline timeline = this.timelines.get(segment.getDataSource());
                    if (timeline == null) {
                        timeline = new VersionedIntervalTimeline((Comparator)Ordering.natural(), true);
                        this.timelines.put(segment.getDataSource(), (VersionedIntervalTimeline<String, ServerSelector>)timeline);
                    }
                    timeline.add(segment.getInterval(), (Object)segment.getVersion(), segment.getShardSpec().createChunk((Object)selector));
                    this.selectors.put(segmentId, selector);
                }
                if ((queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName())) == null) {
                    DruidServer inventoryValue = this.baseView.getInventoryValue(server.getName());
                    if (inventoryValue == null) {
                        log.warn("Could not find server[%s] in inventory. Skipping addition of segment[%s].", new Object[]{server.getName(), segmentId});
                        return;
                    }
                    queryableDruidServer = this.addServer(inventoryValue);
                }
                selector.addServerAndUpdateSegment(queryableDruidServer, segment);
            }
            this.runTimelineCallbacks(callback -> callback.segmentAdded(server, segment));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) {
        SegmentId segmentId = segment.getId();
        Object object = this.lock;
        synchronized (object) {
            log.debug("Removing segment[%s] from server[%s].", new Object[]{segmentId, server});
            if (server.getType().equals((Object)ServerType.BROKER)) {
                this.runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
                return;
            }
            ServerSelector selector = this.selectors.get(segmentId);
            if (selector == null) {
                log.warn("Told to remove non-existant segment[%s]", new Object[]{segmentId});
                return;
            }
            QueryableDruidServer queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName());
            if (queryableDruidServer == null) {
                log.warn("Could not find server[%s] in inventory. Skipping removal of segment[%s].", new Object[]{server.getName(), segmentId});
            } else if (!selector.removeServer(queryableDruidServer)) {
                log.warn("Asked to disassociate non-existant association between server[%s] and segment[%s]", new Object[]{server, segmentId});
            } else {
                this.runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
            }
            if (selector.isEmpty()) {
                VersionedIntervalTimeline<String, ServerSelector> timeline = this.timelines.get(segment.getDataSource());
                this.selectors.remove(segmentId);
                PartitionChunk removedPartition = timeline.remove(segment.getInterval(), (Object)segment.getVersion(), segment.getShardSpec().createChunk((Object)selector));
                if (removedPartition == null) {
                    log.warn("Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", new Object[]{segment.getInterval(), segment.getVersion()});
                } else {
                    this.runTimelineCallbacks(callback -> callback.segmentRemoved(segment));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<VersionedIntervalTimeline<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis) {
        TableDataSource table = (TableDataSource)analysis.getBaseTableDataSource().orElseThrow(() -> new ISE("Cannot handle base datasource: %s", new Object[]{analysis.getBaseDataSource()}));
        Object object = this.lock;
        synchronized (object) {
            return Optional.ofNullable(this.timelines.get(table.getName()));
        }
    }

    @Override
    public void registerTimelineCallback(Executor exec, TimelineServerView.TimelineCallback callback) {
        this.timelineCallbacks.put(callback, exec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> QueryRunner<T> getQueryRunner(DruidServer server) {
        Object object = this.lock;
        synchronized (object) {
            QueryableDruidServer queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName());
            if (queryableDruidServer == null) {
                log.error("No QueryRunner found for server name[%s].", new Object[]{server.getName()});
                return null;
            }
            return queryableDruidServer.getQueryRunner();
        }
    }

    @Override
    public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) {
        this.baseView.registerServerRemovedCallback(exec, callback);
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
        this.baseView.registerSegmentCallback(exec, callback, this.segmentFilter);
    }

    private void runTimelineCallbacks(Function<TimelineServerView.TimelineCallback, ServerView.CallbackAction> function) {
        for (Map.Entry entry : this.timelineCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(() -> {
                if (ServerView.CallbackAction.UNREGISTER == function.apply((TimelineServerView.TimelineCallback)entry.getKey())) {
                    this.timelineCallbacks.remove(entry.getKey());
                }
            });
        }
    }

    @Override
    public List<ImmutableDruidServer> getDruidServers() {
        return this.clients.values().stream().map(queryableDruidServer -> queryableDruidServer.getServer().toImmutableDruidServer()).collect(Collectors.toList());
    }
}

