/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;

public class SinkQuerySegmentWalker
implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
    private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
    private final String dataSource;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final QueryProcessingPool queryProcessingPool;
    private final JoinableFactoryWrapper joinableFactoryWrapper;
    private final Cache cache;
    private final CacheConfig cacheConfig;
    private final CachePopulatorStats cachePopulatorStats;
    private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToBasePendingSegment = new ConcurrentHashMap<SegmentDescriptor, SegmentDescriptor>();

    public SinkQuerySegmentWalker(String dataSource, VersionedIntervalTimeline<String, Sink> sinkTimeline, ObjectMapper objectMapper, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, QueryProcessingPool queryProcessingPool, JoinableFactoryWrapper joinableFactoryWrapper, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats) {
        this.dataSource = (String)Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        this.sinkTimeline = (VersionedIntervalTimeline)Preconditions.checkNotNull(sinkTimeline, (Object)"sinkTimeline");
        this.objectMapper = (ObjectMapper)Preconditions.checkNotNull((Object)objectMapper, (Object)"objectMapper");
        this.emitter = (ServiceEmitter)Preconditions.checkNotNull((Object)emitter, (Object)"emitter");
        this.conglomerate = (QueryRunnerFactoryConglomerate)Preconditions.checkNotNull((Object)conglomerate, (Object)"conglomerate");
        this.queryProcessingPool = (QueryProcessingPool)Preconditions.checkNotNull((Object)queryProcessingPool, (Object)"queryProcessingPool");
        this.joinableFactoryWrapper = joinableFactoryWrapper;
        this.cache = (Cache)Preconditions.checkNotNull((Object)cache, (Object)"cache");
        this.cacheConfig = (CacheConfig)Preconditions.checkNotNull((Object)cacheConfig, (Object)"cacheConfig");
        this.cachePopulatorStats = (CachePopulatorStats)Preconditions.checkNotNull((Object)cachePopulatorStats, (Object)"cachePopulatorStats");
        if (!cache.isLocal()) {
            log.warn("Configured cache[%s] is not local, caching will not be enabled.", new Object[]{cache.getClass().getName()});
        }
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        FunctionalIterable specs = FunctionalIterable.create(intervals).transformCat(arg_0 -> this.sinkTimeline.lookup(arg_0)).transformCat(holder -> FunctionalIterable.create((Iterable)holder.getObject()).transform(chunk -> new SegmentDescriptor(holder.getInterval(), (String)holder.getVersion(), chunk.getChunkNumber())));
        return this.getQueryRunnerForSegments(query, (Iterable<SegmentDescriptor>)specs);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        DataSource dataSourceFromQuery = query.getDataSource();
        DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
        if (!analysis.getBaseTableDataSource().filter(ds -> this.dataSource.equals(ds.getName())).isPresent()) {
            throw new ISE("Cannot handle datasource: %s", new Object[]{dataSourceFromQuery});
        }
        QueryRunnerFactory factory = this.conglomerate.findFactory(query);
        if (factory == null) {
            throw new ISE("Unknown query type[%s].", new Object[]{query.getClass()});
        }
        QueryToolChest toolChest = factory.getToolchest();
        boolean skipIncrementalSegment = query.context().getBoolean(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
        AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
        if (dataSourceFromQuery instanceof QueryDataSource && !toolChest.canPerformSubquery(((QueryDataSource)dataSourceFromQuery).getQuery())) {
            throw new ISE("Cannot handle subquery: %s", new Object[]{dataSourceFromQuery});
        }
        Function segmentMapFn = dataSourceFromQuery.createSegmentMapFunction(query, cpuTimeAccumulator);
        Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
        Iterable perSegmentRunners = Iterables.transform(specs, newDescriptor -> {
            SegmentDescriptor descriptor = this.newIdToBasePendingSegment.getOrDefault(newDescriptor, (SegmentDescriptor)newDescriptor);
            PartitionChunk chunk = this.sinkTimeline.findChunk(descriptor.getInterval(), (Object)descriptor.getVersion(), descriptor.getPartitionNumber());
            if (chunk == null) {
                return new ReportTimelineMissingSegmentQueryRunner(descriptor);
            }
            Sink theSink = (Sink)chunk.getObject();
            SegmentId sinkSegmentId = theSink.getSegment().getId();
            List<SinkSegmentReference> segmentReferences = theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);
            if (segmentReferences == null) {
                return new ReportTimelineMissingSegmentQueryRunner(descriptor);
            }
            if (segmentReferences.isEmpty()) {
                return new NoopQueryRunner();
            }
            Closeable releaser = () -> CloseableUtils.closeAll((Iterable)segmentReferences);
            try {
                SinkQueryRunners perHydrantRunners = new SinkQueryRunners(Iterables.transform(segmentReferences, segmentReference -> {
                    CachingQueryRunner runner = factory.createRunner((Segment)segmentReference.getSegment());
                    if (segmentReference.isImmutable() && this.cache.isLocal()) {
                        StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter();
                        long segmentMinTime = storageAdapter.getMinTime().getMillis();
                        long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
                        Interval actualDataInterval = Intervals.utc((long)segmentMinTime, (long)(segmentMaxTime + 1L));
                        runner = new CachingQueryRunner(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), cacheKeyPrefix, descriptor, actualDataInterval, this.objectMapper, this.cache, toolChest, runner, new ForegroundCachePopulator(this.objectMapper, this.cachePopulatorStats, this.cacheConfig.getMaxEntrySize()), this.cacheConfig);
                    }
                    return new Pair((Object)segmentReference.getSegment().getDataInterval(), (Object)runner);
                }));
                return QueryRunnerHelper.makeClosingQueryRunner((QueryRunner)new SpecificSegmentQueryRunner(this.withPerSinkMetrics((QueryRunner)new BySegmentQueryRunner(sinkSegmentId, descriptor.getInterval().getStart(), factory.mergeRunners((QueryProcessingPool)DirectQueryProcessingPool.INSTANCE, (Iterable)perHydrantRunners)), (QueryToolChest)toolChest, sinkSegmentId, cpuTimeAccumulator), new SpecificSegmentSpec(descriptor)), (Closeable)releaser);
            }
            catch (Throwable e) {
                throw CloseableUtils.closeAndWrapInCatch((Throwable)e, (Closeable)releaser);
            }
        });
        QueryRunner mergedRunner = toolChest.mergeResults(factory.mergeRunners(this.queryProcessingPool, perSegmentRunners));
        return CPUTimeMetricQueryRunner.safeBuild((QueryRunner)new FinalizeResultsQueryRunner(mergedRunner, toolChest), (QueryToolChest)toolChest, (ServiceEmitter)this.emitter, (AtomicLong)cpuTimeAccumulator, (boolean)true);
    }

    public void registerNewVersionOfPendingSegment(SegmentIdWithShardSpec basePendingSegment, SegmentIdWithShardSpec newSegmentVersion) {
        this.newIdToBasePendingSegment.put(newSegmentVersion.asSegmentId().toDescriptor(), basePendingSegment.asSegmentId().toDescriptor());
    }

    @VisibleForTesting
    String getDataSource() {
        return this.dataSource;
    }

    private <T> QueryRunner<T> withPerSinkMetrics(QueryRunner<T> sinkRunner, QueryToolChest<T, ? extends Query<T>> queryToolChest, SegmentId sinkSegmentId, AtomicLong cpuTimeAccumulator) {
        String sinkSegmentIdString = sinkSegmentId.toString();
        return CPUTimeMetricQueryRunner.safeBuild((QueryRunner)new MetricsEmittingQueryRunner(this.emitter, queryToolChest, (QueryRunner)new MetricsEmittingQueryRunner(this.emitter, queryToolChest, sinkRunner, QueryMetrics::reportSegmentTime, queryMetrics -> queryMetrics.segment(sinkSegmentIdString)), QueryMetrics::reportSegmentAndCacheTime, queryMetrics -> queryMetrics.segment(sinkSegmentIdString)).withWaitMeasuredFromNow(), queryToolChest, (ServiceEmitter)this.emitter, (AtomicLong)cpuTimeAccumulator, (boolean)false);
    }

    public VersionedIntervalTimeline<String, Sink> getSinkTimeline() {
        return this.sinkTimeline;
    }

    public static String makeHydrantCacheIdentifier(FireHydrant hydrant) {
        return SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant.getSegmentId(), hydrant.getCount());
    }

    public static String makeHydrantCacheIdentifier(SegmentId segmentId, int hydrantNumber) {
        return segmentId + "_H" + hydrantNumber;
    }
}

