/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.Segment;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.joda.time.Interval;

public class SinkQuerySegmentWalker
implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
    private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
    private final String dataSource;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final ExecutorService queryExecutorService;
    private final Cache cache;
    private final CacheConfig cacheConfig;

    public SinkQuerySegmentWalker(String dataSource, VersionedIntervalTimeline<String, Sink> sinkTimeline, ObjectMapper objectMapper, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, ExecutorService queryExecutorService, Cache cache, CacheConfig cacheConfig) {
        this.dataSource = (String)Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        this.sinkTimeline = (VersionedIntervalTimeline)Preconditions.checkNotNull(sinkTimeline, (Object)"sinkTimeline");
        this.objectMapper = (ObjectMapper)Preconditions.checkNotNull((Object)objectMapper, (Object)"objectMapper");
        this.emitter = (ServiceEmitter)Preconditions.checkNotNull((Object)emitter, (Object)"emitter");
        this.conglomerate = (QueryRunnerFactoryConglomerate)Preconditions.checkNotNull((Object)conglomerate, (Object)"conglomerate");
        this.queryExecutorService = (ExecutorService)Preconditions.checkNotNull((Object)queryExecutorService, (Object)"queryExecutorService");
        this.cache = (Cache)Preconditions.checkNotNull((Object)cache, (Object)"cache");
        this.cacheConfig = (CacheConfig)Preconditions.checkNotNull((Object)cacheConfig, (Object)"cacheConfig");
        if (!cache.isLocal()) {
            log.warn("Configured cache[%s] is not local, caching will not be enabled.", new Object[]{cache.getClass().getName()});
        }
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        FunctionalIterable specs = FunctionalIterable.create(intervals).transformCat((Function)new Function<Interval, Iterable<TimelineObjectHolder<String, Sink>>>(){

            public Iterable<TimelineObjectHolder<String, Sink>> apply(Interval interval) {
                return SinkQuerySegmentWalker.this.sinkTimeline.lookup(interval);
            }
        }).transformCat((Function)new Function<TimelineObjectHolder<String, Sink>, Iterable<SegmentDescriptor>>(){

            public Iterable<SegmentDescriptor> apply(final TimelineObjectHolder<String, Sink> holder) {
                return FunctionalIterable.create((Iterable)holder.getObject()).transform((Function)new Function<PartitionChunk<Sink>, SegmentDescriptor>(){

                    public SegmentDescriptor apply(PartitionChunk<Sink> chunk) {
                        return new SegmentDescriptor(holder.getInterval(), (String)holder.getVersion(), chunk.getChunkNumber());
                    }
                });
            }
        });
        return this.getQueryRunnerForSegments(query, (Iterable<SegmentDescriptor>)specs);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs) {
        if (!(query.getDataSource() instanceof TableDataSource) || !this.dataSource.equals(((TableDataSource)query.getDataSource()).getName())) {
            log.makeAlert("Received query for unknown dataSource", new Object[0]).addData("dataSource", (Object)query.getDataSource()).emit();
            return new NoopQueryRunner();
        }
        final QueryRunnerFactory factory = this.conglomerate.findFactory(query);
        if (factory == null) {
            throw new ISE("Unknown query type[%s].", new Object[]{query.getClass()});
        }
        final QueryToolChest toolChest = factory.getToolchest();
        final Function builderFn = new Function<Query<T>, ServiceMetricEvent.Builder>(){

            public ServiceMetricEvent.Builder apply(@Nullable Query<T> input) {
                return toolChest.makeMetricBuilder(query);
            }
        };
        final boolean skipIncrementalSegment = (Boolean)query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, (Object)false);
        final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
        return CPUTimeMetricQueryRunner.safeBuild((QueryRunner)toolChest.mergeResults(factory.mergeRunners(this.queryExecutorService, (Iterable)FunctionalIterable.create(specs).transform(new Function<SegmentDescriptor, QueryRunner<T>>(){

            public QueryRunner<T> apply(final SegmentDescriptor descriptor) {
                PartitionHolder holder = SinkQuerySegmentWalker.this.sinkTimeline.findEntry(descriptor.getInterval(), (Object)descriptor.getVersion());
                if (holder == null) {
                    return new ReportTimelineMissingSegmentQueryRunner(descriptor);
                }
                PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber());
                if (chunk == null) {
                    return new ReportTimelineMissingSegmentQueryRunner(descriptor);
                }
                Sink theSink = (Sink)chunk.getObject();
                String sinkSegmentIdentifier = theSink.getSegment().getIdentifier();
                return new SpecificSegmentQueryRunner(SinkQuerySegmentWalker.this.withPerSinkMetrics((QueryRunner)new BySegmentQueryRunner(sinkSegmentIdentifier, descriptor.getInterval().getStart(), factory.mergeRunners((ExecutorService)MoreExecutors.sameThreadExecutor(), Iterables.transform((Iterable)theSink, (Function)new Function<FireHydrant, QueryRunner<T>>(){

                    public QueryRunner<T> apply(FireHydrant hydrant) {
                        boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
                        if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
                            return new NoopQueryRunner();
                        }
                        Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
                        try {
                            QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner((QueryRunner)factory.createRunner((Segment)segment.lhs), (Closeable)((Closeable)segment.rhs));
                            if (hydrantDefinitelySwapped && SinkQuerySegmentWalker.this.cache.isLocal()) {
                                return new CachingQueryRunner(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant), descriptor, SinkQuerySegmentWalker.this.objectMapper, SinkQuerySegmentWalker.this.cache, toolChest, baseRunner, (ExecutorService)MoreExecutors.sameThreadExecutor(), SinkQuerySegmentWalker.this.cacheConfig);
                            }
                            return baseRunner;
                        }
                        catch (RuntimeException e) {
                            CloseQuietly.close((Closeable)((Closeable)segment.rhs));
                            throw e;
                        }
                    }
                }))), builderFn, sinkSegmentIdentifier, cpuTimeAccumulator), new SpecificSegmentSpec(descriptor));
            }
        }))), (Function)builderFn, (ServiceEmitter)this.emitter, (AtomicLong)cpuTimeAccumulator, (boolean)true);
    }

    private <T> QueryRunner<T> withPerSinkMetrics(QueryRunner<T> sinkRunner, Function<Query<T>, ServiceMetricEvent.Builder> builderFn, String sinkSegmentIdentifier, AtomicLong cpuTimeAccumulator) {
        ImmutableMap dims = ImmutableMap.of((Object)"segment", (Object)sinkSegmentIdentifier);
        return CPUTimeMetricQueryRunner.safeBuild((QueryRunner)new MetricsEmittingQueryRunner(this.emitter, builderFn, (QueryRunner)new MetricsEmittingQueryRunner(this.emitter, builderFn, sinkRunner, "query/segment/time", (Map)dims), "query/segmentAndCache/time", (Map)dims).withWaitMeasuredFromNow(), builderFn, (ServiceEmitter)this.emitter, (AtomicLong)cpuTimeAccumulator, (boolean)false);
    }

    public static String makeHydrantCacheIdentifier(FireHydrant input) {
        return input.getSegment().getIdentifier() + "_" + input.getCount();
    }
}

