/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.joda.time.Interval;

public class RealtimeManager
implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class);
    private final List<FireDepartment> fireDepartments;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final DataSegmentServerAnnouncer serverAnnouncer;
    private final Map<String, Map<Integer, FireChief>> chiefs;
    private ExecutorService fireChiefExecutor;
    private boolean stopping;

    @Inject
    public RealtimeManager(List<FireDepartment> fireDepartments, QueryRunnerFactoryConglomerate conglomerate, DataSegmentServerAnnouncer serverAnnouncer) {
        this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap());
    }

    @VisibleForTesting
    RealtimeManager(List<FireDepartment> fireDepartments, QueryRunnerFactoryConglomerate conglomerate, DataSegmentServerAnnouncer serverAnnouncer, Map<String, Map<Integer, FireChief>> chiefs) {
        this.fireDepartments = fireDepartments;
        this.conglomerate = conglomerate;
        this.serverAnnouncer = serverAnnouncer;
        this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs);
    }

    @VisibleForTesting
    Map<Integer, FireChief> getFireChiefs(String dataSource) {
        return this.chiefs.get(dataSource);
    }

    @LifecycleStart
    public void start() throws IOException {
        this.serverAnnouncer.announce();
        this.fireChiefExecutor = Execs.multiThreaded((int)this.fireDepartments.size(), (String)"chief-%d");
        for (FireDepartment fireDepartment : this.fireDepartments) {
            DataSchema schema = fireDepartment.getDataSchema();
            FireChief chief = new FireChief(fireDepartment, this.conglomerate);
            this.chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap()).put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
            this.fireChiefExecutor.submit(chief);
        }
    }

    @LifecycleStop
    public void stop() {
        this.stopping = true;
        try {
            if (this.fireChiefExecutor != null) {
                this.fireChiefExecutor.shutdownNow();
                Preconditions.checkState((boolean)this.fireChiefExecutor.awaitTermination(10L, TimeUnit.SECONDS), (Object)"persistExecutor not terminated");
            }
        }
        catch (InterruptedException e) {
            throw new ISE((Throwable)e, "Failed to shutdown fireChiefExecutor during stop()", new Object[0]);
        }
        this.serverAnnouncer.unannounce();
    }

    public FireDepartmentMetrics getMetrics(String datasource) {
        Map<Integer, FireChief> chiefs = this.chiefs.get(datasource);
        if (chiefs == null) {
            return null;
        }
        FireDepartmentMetrics snapshot = null;
        for (FireChief chief : chiefs.values()) {
            if (snapshot == null) {
                snapshot = chief.getMetrics().snapshot();
                continue;
            }
            snapshot.merge(chief.getMetrics());
        }
        return snapshot;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals) {
        QueryRunnerFactory factory = this.conglomerate.findFactory(query);
        Map<Integer, FireChief> partitionChiefs = this.chiefs.get(Iterables.getOnlyElement((Iterable)query.getDataSource().getNames()));
        return partitionChiefs == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(factory.mergeRunners((ExecutorService)MoreExecutors.sameThreadExecutor(), Iterables.transform(partitionChiefs.values(), (Function)new Function<FireChief, QueryRunner<T>>(){

            public QueryRunner<T> apply(FireChief fireChief) {
                return fireChief.getQueryRunner(query);
            }
        })));
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs) {
        QueryRunnerFactory factory = this.conglomerate.findFactory(query);
        final Map<Integer, FireChief> partitionChiefs = this.chiefs.get(Iterables.getOnlyElement((Iterable)query.getDataSource().getNames()));
        return partitionChiefs == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(factory.mergeRunners((ExecutorService)MoreExecutors.sameThreadExecutor(), Iterables.transform(specs, (Function)new Function<SegmentDescriptor, QueryRunner<T>>(){

            public QueryRunner<T> apply(SegmentDescriptor spec) {
                FireChief retVal = (FireChief)partitionChiefs.get(spec.getPartitionNumber());
                return retVal == null ? new QueryRunner() : retVal.getQueryRunner(query.withQuerySegmentSpec((QuerySegmentSpec)new SpecificSegmentSpec(spec)));
            }
        })));
    }

    class FireChief
    implements Runnable {
        private final FireDepartment fireDepartment;
        private final FireDepartmentMetrics metrics;
        private final RealtimeTuningConfig config;
        private final QueryRunnerFactoryConglomerate conglomerate;
        private Plumber plumber;

        FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate) {
            this.fireDepartment = fireDepartment;
            this.conglomerate = conglomerate;
            this.config = fireDepartment.getTuningConfig();
            this.metrics = fireDepartment.getMetrics();
        }

        private Firehose initFirehose() {
            try {
                log.info("Calling the FireDepartment and getting a Firehose.", new Object[0]);
                return this.fireDepartment.connect();
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        private FirehoseV2 initFirehoseV2(Object metaData) {
            try {
                log.info("Calling the FireDepartment and getting a FirehoseV2.", new Object[0]);
                return this.fireDepartment.connect(metaData);
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        private void initPlumber() {
            log.info("Someone get us a plumber!", new Object[0]);
            this.plumber = this.fireDepartment.findPlumber();
        }

        @VisibleForTesting
        Plumber getPlumber() {
            return this.plumber;
        }

        public FireDepartmentMetrics getMetrics() {
            return this.metrics;
        }

        @Override
        public void run() {
            this.initPlumber();
            try (Closer closer = Closer.create();){
                boolean success;
                Object metadata = this.plumber.startJob();
                if (this.fireDepartment.checkFirehoseV2()) {
                    FirehoseV2 firehoseV2 = this.initFirehoseV2(metadata);
                    closer.register((Closeable)firehoseV2);
                    success = this.runFirehoseV2(firehoseV2);
                } else {
                    Firehose firehose = this.initFirehose();
                    closer.register((Closeable)firehose);
                    success = this.runFirehose(firehose);
                }
                if (success) {
                    closer.register(() -> this.plumber.finishJob());
                }
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception {
            firehose.start();
            log.info("FirehoseV2 started", new Object[0]);
            Supplier<Committer> committerSupplier = Committers.supplierFromFirehoseV2(firehose);
            boolean haveRow = true;
            while (haveRow) {
                if (Thread.interrupted() || RealtimeManager.this.stopping) {
                    return false;
                }
                InputRow inputRow = null;
                int numRows = 0;
                try {
                    inputRow = firehose.currRow();
                    if (inputRow != null) {
                        numRows = this.plumber.add(inputRow, committerSupplier);
                        if (numRows < 0) {
                            this.metrics.incrementThrownAway();
                            log.debug("Throwing away event[%s]", new Object[]{inputRow});
                        } else {
                            this.metrics.incrementProcessed();
                        }
                    } else {
                        log.debug("thrown away null input row, considering unparseable", new Object[0]);
                        this.metrics.incrementUnparseable();
                    }
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Unknown exception, Ignoring and continuing.", new Object[0]).addData("inputRow", (Object)inputRow).emit();
                }
                try {
                    haveRow = firehose.advance();
                }
                catch (Exception e) {
                    log.debug((Throwable)e, "exception in firehose.advance(), considering unparseable row", new Object[0]);
                    this.metrics.incrementUnparseable();
                }
            }
            return true;
        }

        private boolean runFirehose(Firehose firehose) {
            Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
            while (firehose.hasMore()) {
                if (Thread.interrupted() || RealtimeManager.this.stopping) {
                    return false;
                }
                Plumbers.addNextRow(committerSupplier, firehose, this.plumber, this.config.isReportParseExceptions(), this.metrics);
            }
            return true;
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            QueryRunnerFactory factory = this.conglomerate.findFactory(query);
            QueryToolChest toolChest = factory.getToolchest();
            return new FinalizeResultsQueryRunner(this.plumber.getQueryRunner(query), toolChest);
        }
    }
}

