/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.common.guava.ThreadRenamingRunnable;
import org.apache.druid.concurrent.TaskThreadPriority;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import org.apache.druid.segment.realtime.plumber.Plumber;
import org.apache.druid.segment.realtime.plumber.RejectionPolicy;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;

public class RealtimePlumber
implements Plumber {
    private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
    private static final int WARN_DELAY = 1000;
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final RejectionPolicy rejectionPolicy;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final DataSegmentPusher dataSegmentPusher;
    private final SegmentPublisher segmentPublisher;
    private final SegmentHandoffNotifier handoffNotifier;
    private final Object handoffCondition = new Object();
    private final ConcurrentMap<Long, Sink> sinks = new ConcurrentHashMap<Long, Sink>();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER);
    private final QuerySegmentWalker texasRanger;
    private final Cache cache;
    private volatile long nextFlush = 0L;
    private volatile boolean shuttingDown = false;
    private volatile boolean stopped = false;
    private volatile boolean cleanShutdown = true;
    private volatile ExecutorService persistExecutor = null;
    private volatile ExecutorService mergeExecutor = null;
    private volatile ScheduledExecutorService scheduledExecutor = null;
    private volatile IndexMerger indexMerger;
    private volatile IndexIO indexIO;
    private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
    private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";

    public RealtimePlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, SegmentHandoffNotifier handoffNotifier, IndexMerger indexMerger, IndexIO indexIO, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper) {
        this.schema = schema;
        this.config = config;
        this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod());
        this.metrics = metrics;
        this.segmentAnnouncer = segmentAnnouncer;
        this.dataSegmentPusher = dataSegmentPusher;
        this.segmentPublisher = segmentPublisher;
        this.handoffNotifier = handoffNotifier;
        this.indexMerger = (IndexMerger)Preconditions.checkNotNull((Object)indexMerger, (Object)"Null IndexMerger");
        this.indexIO = (IndexIO)Preconditions.checkNotNull((Object)indexIO, (Object)"Null IndexIO");
        this.cache = cache;
        this.texasRanger = new SinkQuerySegmentWalker(schema.getDataSource(), this.sinkTimeline, objectMapper, emitter, conglomerate, queryProcessingPool, new JoinableFactoryWrapper(joinableFactory), cache, cacheConfig, cachePopulatorStats);
        log.info("Creating plumber using rejectionPolicy[%s]", new Object[]{this.getRejectionPolicy()});
    }

    public DataSchema getSchema() {
        return this.schema;
    }

    public RealtimeTuningConfig getConfig() {
        return this.config;
    }

    public RejectionPolicy getRejectionPolicy() {
        return this.rejectionPolicy;
    }

    public Map<Long, Sink> getSinks() {
        return this.sinks;
    }

    @Override
    public Object startJob() {
        try {
            org.apache.druid.java.util.common.FileUtils.mkdirp((File)this.computeBaseDir(this.schema));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.initializeExecutors();
        this.handoffNotifier.start();
        Object retVal = this.bootstrapSinksFromDisk();
        this.startPersistThread();
        this.mergeAndPush();
        this.resetNextFlush();
        return retVal;
    }

    @Override
    public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException {
        long messageTimestamp = row.getTimestampFromEpoch();
        Sink sink = this.getSink(messageTimestamp);
        this.metrics.reportMessageMaxTimestamp(messageTimestamp);
        if (sink == null) {
            return Plumber.THROWAWAY;
        }
        IncrementalIndexAddResult addResult = sink.add(row, false);
        if (this.config.isReportParseExceptions() && addResult.getParseException() != null) {
            throw addResult.getParseException();
        }
        if (!sink.canAppendRow() || System.currentTimeMillis() > this.nextFlush) {
            this.persist((Committer)committerSupplier.get());
        }
        return addResult;
    }

    private Sink getSink(long timestamp) {
        if (!this.rejectionPolicy.accept(timestamp)) {
            return null;
        }
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc((long)timestamp));
        long truncatedTime = truncatedDateTime.getMillis();
        Sink retVal = (Sink)this.sinks.get(truncatedTime);
        if (retVal == null) {
            Interval sinkInterval = new Interval((ReadableInstant)truncatedDateTime, (ReadableInstant)segmentGranularity.increment(truncatedDateTime));
            retVal = new Sink(sinkInterval, this.schema, this.config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), this.config.getAppendableIndexSpec(), this.config.getMaxRowsInMemory(), this.config.getMaxBytesInMemoryOrDefault(), true, this.config.getDedupColumn());
            this.addSink(retVal);
        }
        return retVal;
    }

    @Override
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.texasRanger.getQueryRunnerForIntervals(query, (Iterable)query.getIntervals());
    }

    @Override
    public void persist(Committer committer) {
        final ArrayList<Pair> indexesToPersist = new ArrayList<Pair>();
        for (Sink sink : this.sinks.values()) {
            if (!sink.swappable()) continue;
            indexesToPersist.add(Pair.of((Object)sink.swap(), (Object)sink.getInterval()));
        }
        log.info("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        Stopwatch runExecStopwatch = Stopwatch.createStarted();
        Stopwatch persistStopwatch = Stopwatch.createStarted();
        ImmutableMap metadataElems = committer.getMetadata() == null ? null : ImmutableMap.of((Object)COMMIT_METADATA_KEY, (Object)committer.getMetadata(), (Object)COMMIT_METADATA_TIMESTAMP_KEY, (Object)System.currentTimeMillis());
        this.persistExecutor.execute((Runnable)new ThreadRenamingRunnable(StringUtils.format((String)"%s-incremental-persist", (Object[])new Object[]{this.schema.getDataSource()}), (Map)metadataElems, committer, persistStopwatch){
            final /* synthetic */ Map val$metadataElems;
            final /* synthetic */ Committer val$committer;
            final /* synthetic */ Stopwatch val$persistStopwatch;
            {
                this.val$metadataElems = map;
                this.val$committer = committer;
                this.val$persistStopwatch = stopwatch;
                super(x0);
            }

            public void doRun() {
                long persistThreadCpuTime = JvmUtils.safeGetThreadCpuTime();
                try {
                    for (Pair pair : indexesToPersist) {
                        RealtimePlumber.this.metrics.incrementRowOutputCount(RealtimePlumber.this.persistHydrant((FireHydrant)pair.lhs, RealtimePlumber.this.schema, (Interval)pair.rhs, this.val$metadataElems));
                    }
                    this.val$committer.run();
                }
                catch (Exception e) {
                    RealtimePlumber.this.metrics.incrementFailedPersists();
                    throw e;
                }
                finally {
                    RealtimePlumber.this.metrics.incrementPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
                    RealtimePlumber.this.metrics.incrementNumPersists();
                    RealtimePlumber.this.metrics.incrementPersistTimeMillis(this.val$persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
                    this.val$persistStopwatch.stop();
                }
            }
        });
        long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(startDelay);
        if (startDelay > 1000L) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{startDelay});
        }
        runExecStopwatch.stop();
        this.resetNextFlush();
    }

    private void persistAndMerge(final long truncatedTime, final Sink sink) {
        String threadName = StringUtils.format((String)"%s-%s-persist-n-merge", (Object[])new Object[]{this.schema.getDataSource(), DateTimes.utc((long)truncatedTime)});
        this.mergeExecutor.execute((Runnable)new ThreadRenamingRunnable(threadName){
            final Interval interval;
            Stopwatch mergeStopwatch;
            {
                super(x0);
                this.interval = sink.getInterval();
                this.mergeStopwatch = null;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void doRun() {
                try {
                    File mergedFile;
                    if (RealtimePlumber.this.sinks.get(truncatedTime) != sink) {
                        log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", new Object[]{sink});
                        return;
                    }
                    File persistDir = RealtimePlumber.this.computePersistDir(RealtimePlumber.this.schema, this.interval);
                    File mergedTarget = new File(persistDir, "merged");
                    File isPushedMarker = new File(persistDir, "isPushedMarker");
                    if (!isPushedMarker.exists()) {
                        RealtimePlumber.this.removeSegment(sink, mergedTarget);
                        if (mergedTarget.exists()) {
                            log.warn("Merged target[%s] still exists after attempt to delete it; skipping push.", new Object[]{mergedTarget});
                            return;
                        }
                    } else {
                        log.info("Already pushed sink[%s]", new Object[]{sink});
                        return;
                    }
                    Iterator<FireHydrant> iterator = sink.iterator();
                    while (iterator.hasNext()) {
                        FireHydrant hydrant;
                        FireHydrant fireHydrant = hydrant = iterator.next();
                        synchronized (fireHydrant) {
                            if (!hydrant.hasSwapped()) {
                                log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", new Object[]{hydrant, sink});
                                int rowCount = RealtimePlumber.this.persistHydrant(hydrant, RealtimePlumber.this.schema, this.interval, null);
                                RealtimePlumber.this.metrics.incrementRowOutputCount(rowCount);
                            }
                        }
                    }
                    long mergeThreadCpuTime = JvmUtils.safeGetThreadCpuTime();
                    this.mergeStopwatch = Stopwatch.createStarted();
                    ArrayList<QueryableIndex> indexes = new ArrayList<QueryableIndex>();
                    try (Closer closer = Closer.create();){
                        for (FireHydrant fireHydrant : sink) {
                            Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
                            QueryableIndex queryableIndex = ((ReferenceCountingSegment)segmentAndCloseable.lhs).asQueryableIndex();
                            log.info("Adding hydrant[%s]", new Object[]{fireHydrant});
                            indexes.add(queryableIndex);
                            closer.register((Closeable)segmentAndCloseable.rhs);
                        }
                        mergedFile = RealtimePlumber.this.indexMerger.mergeQueryableIndex(indexes, RealtimePlumber.this.schema.getGranularitySpec().isRollup(), RealtimePlumber.this.schema.getAggregators(), null, mergedTarget, RealtimePlumber.this.config.getIndexSpec(), RealtimePlumber.this.config.getIndexSpecForIntermediatePersists(), (ProgressIndicator)new BaseProgressIndicator(), RealtimePlumber.this.config.getSegmentWriteOutMediumFactory(), -1);
                    }
                    RealtimePlumber.this.metrics.incrementMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
                    RealtimePlumber.this.metrics.incrementMergeTimeMillis(this.mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
                    log.info("Pushing [%s] to deep storage", new Object[]{sink.getSegment().getId()});
                    DataSegment segment = RealtimePlumber.this.dataSegmentPusher.push(mergedFile, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, (DimensionsSpec)RealtimePlumber.this.schema.getDimensionsSpec())), false);
                    log.info("Inserting [%s] to the metadata store", new Object[]{sink.getSegment().getId()});
                    RealtimePlumber.this.segmentPublisher.publishSegment(segment);
                    if (!isPushedMarker.createNewFile()) {
                        log.makeAlert("Failed to create marker file for [%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", (Object)sink.getInterval()).addData("partitionNum", (Object)segment.getShardSpec().getPartitionNum()).addData("marker", (Object)isPushedMarker).emit();
                    }
                }
                catch (Exception e) {
                    RealtimePlumber.this.metrics.incrementFailedHandoffs();
                    log.makeAlert((Throwable)e, "Failed to persist merged index[%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", (Object)this.interval).emit();
                    if (RealtimePlumber.this.shuttingDown) {
                        RealtimePlumber.this.cleanShutdown = false;
                        RealtimePlumber.this.abandonSegment(truncatedTime, sink);
                    }
                }
                finally {
                    if (this.mergeStopwatch != null) {
                        this.mergeStopwatch.stop();
                    }
                }
            }
        });
        this.handoffNotifier.registerSegmentHandoffCallback(new SegmentDescriptor(sink.getInterval(), sink.getVersion(), this.config.getShardSpec().getPartitionNum()), this.mergeExecutor, new Runnable(){

            @Override
            public void run() {
                RealtimePlumber.this.abandonSegment(sink.getInterval().getStartMillis(), sink);
                RealtimePlumber.this.metrics.incrementHandOffCount();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finishJob() {
        log.info("Shutting down...", new Object[0]);
        this.shuttingDown = true;
        for (Map.Entry entry : this.sinks.entrySet()) {
            ((Sink)entry.getValue()).clearDedupCache();
            this.persistAndMerge((Long)entry.getKey(), (Sink)entry.getValue());
        }
        long forceEndWaitTime = System.currentTimeMillis() + this.config.getHandoffConditionTimeout();
        while (!this.sinks.isEmpty()) {
            try {
                log.info("Cannot shut down yet! Sinks remaining: %s", new Object[]{Collections2.transform(this.sinks.values(), sink -> sink.getSegment().getId())});
                Object object = this.handoffCondition;
                synchronized (object) {
                    while (!this.sinks.isEmpty()) {
                        if (this.config.getHandoffConditionTimeout() == 0L) {
                            this.handoffCondition.wait();
                            continue;
                        }
                        long curr = System.currentTimeMillis();
                        if (forceEndWaitTime - curr > 0L) {
                            this.handoffCondition.wait(forceEndWaitTime - curr);
                            continue;
                        }
                        throw new ISE("Segment handoff wait timeout. [%s] segments might not have completed handoff.", new Object[]{this.sinks.size()});
                    }
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        this.handoffNotifier.close();
        this.shutdownExecutors();
        this.stopped = true;
        if (!this.cleanShutdown) {
            throw new ISE("Exception occurred during persist and merge.", new Object[0]);
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus((ReadablePeriod)this.config.getIntermediatePersistPeriod()).getMillis();
    }

    protected void initializeExecutors() {
        int maxPendingPersists = this.config.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = Execs.newBlockingSingleThreaded((String)"plumber_persist_%d", (int)maxPendingPersists, (Integer)TaskThreadPriority.getThreadPriorityFromTaskPriority((int)this.config.getPersistThreadPriority()));
        }
        if (this.mergeExecutor == null) {
            this.mergeExecutor = Execs.newBlockingSingleThreaded((String)"plumber_merge_%d", (int)1, (Integer)TaskThreadPriority.getThreadPriorityFromTaskPriority((int)this.config.getMergeThreadPriority()));
        }
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Execs.scheduledSingleThreaded((String)"plumber_scheduled_%d");
        }
    }

    protected void shutdownExecutors() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
            this.persistExecutor.shutdown();
            this.mergeExecutor.shutdown();
        }
    }

    protected Object bootstrapSinksFromDisk() {
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        File baseDir = this.computeBaseDir(this.schema);
        if (baseDir == null || !baseDir.exists()) {
            return null;
        }
        File[] files = baseDir.listFiles();
        if (files == null) {
            return null;
        }
        Object metadata = null;
        long latestCommitTime = 0L;
        for (File sinkDir : files) {
            Interval sinkInterval = Intervals.of((String)sinkDir.getName().replace('_', '/'));
            File[] sinkFiles = sinkDir.listFiles(new FilenameFilter(){

                @Override
                public boolean accept(File dir, String fileName) {
                    return Ints.tryParse((String)fileName) != null;
                }
            });
            Arrays.sort(sinkFiles, new Comparator<File>(){

                @Override
                public int compare(File o1, File o2) {
                    try {
                        return Ints.compare((int)Integer.parseInt(o1.getName()), (int)Integer.parseInt(o2.getName()));
                    }
                    catch (NumberFormatException e) {
                        log.error((Throwable)e, "Couldn't compare as numbers? [%s][%s]", new Object[]{o1, o2});
                        return o1.compareTo(o2);
                    }
                }
            });
            boolean isCorrupted = false;
            ArrayList<FireHydrant> hydrants = new ArrayList<FireHydrant>();
            for (File segmentDir : sinkFiles) {
                long timestamp;
                Object timestampObj;
                log.info("Loading previously persisted segment at [%s]", new Object[]{segmentDir});
                if (Ints.tryParse((String)segmentDir.getName()) == null) continue;
                QueryableIndex queryableIndex = null;
                try {
                    queryableIndex = this.indexIO.loadIndex(segmentDir);
                }
                catch (IOException e) {
                    log.error((Throwable)e, "Problem loading segmentDir from disk.", new Object[0]);
                    isCorrupted = true;
                }
                if (isCorrupted) {
                    try {
                        File corruptSegmentDir = this.computeCorruptedFileDumpDir(segmentDir, this.schema);
                        log.info("Renaming %s to %s", new Object[]{segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()});
                        FileUtils.copyDirectory((File)segmentDir, (File)corruptSegmentDir);
                        org.apache.druid.java.util.common.FileUtils.deleteDirectory((File)segmentDir);
                    }
                    catch (Exception e1) {
                        log.error((Throwable)e1, "Failed to rename %s", new Object[]{segmentDir.getAbsolutePath()});
                    }
                    continue;
                }
                Metadata segmentMetadata = queryableIndex.getMetadata();
                if (segmentMetadata != null && (timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY)) != null && (timestamp = ((Long)timestampObj).longValue()) > latestCommitTime) {
                    log.info("Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", new Object[]{queryableIndex.getMetadata(), timestamp, latestCommitTime});
                    latestCommitTime = timestamp;
                    metadata = queryableIndex.getMetadata().get(COMMIT_METADATA_KEY);
                }
                hydrants.add(new FireHydrant((Segment)new QueryableIndexSegment(queryableIndex, SegmentId.of((String)this.schema.getDataSource(), (Interval)sinkInterval, (String)versioningPolicy.getVersion(sinkInterval), (ShardSpec)this.config.getShardSpec())), Integer.parseInt(segmentDir.getName())));
            }
            if (hydrants.isEmpty()) {
                log.warn("Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", new Object[]{sinkDir.getAbsolutePath()});
                continue;
            }
            Sink currSink = new Sink(sinkInterval, this.schema, this.config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), this.config.getAppendableIndexSpec(), this.config.getMaxRowsInMemory(), this.config.getMaxBytesInMemoryOrDefault(), true, this.config.getDedupColumn(), hydrants);
            this.addSink(currSink);
        }
        return metadata;
    }

    private void addSink(Sink sink) {
        this.sinks.put(sink.getInterval().getStartMillis(), sink);
        this.metrics.setSinkCount(this.sinks.size());
        this.sinkTimeline.add(sink.getInterval(), (Object)sink.getVersion(), (PartitionChunk)new SingleElementPartitionChunk((Object)sink));
        try {
            this.segmentAnnouncer.announceSegment(sink.getSegment());
        }
        catch (IOException e) {
            log.makeAlert((Throwable)e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", (Object)sink.getInterval()).emit();
        }
        this.clearDedupCache();
    }

    protected void startPersistThread() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        DateTime truncatedNow = segmentGranularity.bucketStart(DateTimes.nowUtc());
        long windowMillis = windowPeriod.toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{DateTimes.nowUtc().plus((ReadableDuration)new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow).getMillis() + windowMillis))});
        String threadName = StringUtils.format((String)"%s-overseer-%d", (Object[])new Object[]{this.schema.getDataSource(), this.config.getShardSpec().getPartitionNum()});
        ThreadRenamingCallable<ScheduledExecutors.Signal> threadRenamingCallable = new ThreadRenamingCallable<ScheduledExecutors.Signal>(threadName){

            public ScheduledExecutors.Signal doCall() {
                if (RealtimePlumber.this.stopped) {
                    log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                RealtimePlumber.this.mergeAndPush();
                if (RealtimePlumber.this.stopped) {
                    log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                return ScheduledExecutors.Signal.REPEAT;
            }
        };
        Duration initialDelay = new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow).getMillis() + windowMillis);
        Duration rate = new Duration((ReadableInstant)truncatedNow, (ReadableInstant)segmentGranularity.increment(truncatedNow));
        ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.scheduledExecutor, (Duration)initialDelay, (Duration)rate, (Callable)threadRenamingCallable);
    }

    private void clearDedupCache() {
        long minTimestamp = this.getAllowedMinTime().getMillis();
        for (Map.Entry entry : this.sinks.entrySet()) {
            Long intervalStart = (Long)entry.getKey();
            if (intervalStart >= minTimestamp) continue;
            ((Sink)entry.getValue()).clearDedupCache();
        }
    }

    private DateTime getAllowedMinTime() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        long windowMillis = windowPeriod.toStandardDuration().getMillis();
        return segmentGranularity.bucketStart(DateTimes.utc((long)(Math.max(windowMillis, this.rejectionPolicy.getCurrMaxTime().getMillis()) - windowMillis)));
    }

    private void mergeAndPush() {
        log.info("Starting merge and push.", new Object[0]);
        DateTime minTimestampAsDate = this.getAllowedMinTime();
        long minTimestamp = minTimestampAsDate.getMillis();
        log.info("Found [%,d] segments. Attempting to hand off segments that start before [%s].", new Object[]{this.sinks.size(), minTimestampAsDate});
        ArrayList sinksToPush = new ArrayList();
        for (Map.Entry entry : this.sinks.entrySet()) {
            Long intervalStart = (Long)entry.getKey();
            if (intervalStart < minTimestamp) {
                log.info("Adding entry [%s] for merge and push.", new Object[]{entry});
                sinksToPush.add(entry);
                ((Sink)entry.getValue()).clearDedupCache();
                continue;
            }
            log.info("Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", new Object[]{entry, DateTimes.utc((long)intervalStart), minTimestampAsDate});
        }
        log.info("Found [%,d] sinks to persist and merge", new Object[]{sinksToPush.size()});
        for (Map.Entry entry : sinksToPush) {
            this.persistAndMerge((Long)entry.getKey(), (Sink)entry.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void abandonSegment(long truncatedTime, Sink sink) {
        if (this.sinks.containsKey(truncatedTime)) {
            try {
                this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                this.removeSegment(sink, this.computePersistDir(this.schema, sink.getInterval()));
                log.info("Removing sinkKey %d for segment %s", new Object[]{truncatedTime, sink.getSegment().getId()});
                this.sinks.remove(truncatedTime);
                this.metrics.setSinkCount(this.sinks.size());
                this.sinkTimeline.remove(sink.getInterval(), (Object)sink.getVersion(), (PartitionChunk)new SingleElementPartitionChunk((Object)sink));
                for (FireHydrant hydrant : sink) {
                    this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
                    hydrant.swapSegment(null);
                }
                Object object = this.handoffCondition;
                synchronized (object) {
                    this.handoffCondition.notifyAll();
                }
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Unable to abandon old segment for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", (Object)sink.getInterval()).emit();
            }
        }
    }

    protected File computeBaseDir(DataSchema schema) {
        return new File(this.config.getBasePersistDirectory(), schema.getDataSource());
    }

    protected File computeCorruptedFileDumpDir(File persistDir, DataSchema schema) {
        return new File(StringUtils.replace((String)persistDir.getAbsolutePath(), (String)schema.getDataSource(), (String)("corrupted" + File.pathSeparator + schema.getDataSource())));
    }

    protected File computePersistDir(DataSchema schema, Interval interval) {
        return new File(this.computeBaseDir(schema), interval.toString().replace('/', '_'));
    }

    protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Map<String, Object> metadataElems) {
        FireHydrant fireHydrant = indexToPersist;
        synchronized (fireHydrant) {
            if (indexToPersist.hasSwapped()) {
                log.info("DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{schema.getDataSource(), interval, indexToPersist});
                return 0;
            }
            log.info("DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", new Object[]{schema.getDataSource(), interval, metadataElems, indexToPersist});
            try {
                int numRows = indexToPersist.getIndex().size();
                indexToPersist.getIndex().getMetadata().putAll(metadataElems);
                File persistedFile = this.indexMerger.persist(indexToPersist.getIndex(), interval, new File(this.computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), this.config.getIndexSpecForIntermediatePersists(), this.config.getSegmentWriteOutMediumFactory());
                indexToPersist.swapSegment((Segment)new QueryableIndexSegment(this.indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()));
                return numRows;
            }
            catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", new Object[]{schema.getDataSource()}).addData("interval", (Object)interval).addData("count", (Object)indexToPersist.getCount()).emit();
                throw new RuntimeException(e);
            }
        }
    }

    private void removeSegment(Sink sink, File target) {
        if (target.exists()) {
            try {
                log.info("Deleting Index File[%s]", new Object[]{target});
                org.apache.druid.java.util.common.FileUtils.deleteDirectory((File)target);
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Unable to remove file for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData("file", (Object)target).addData("interval", (Object)sink.getInterval()).emit();
            }
        }
    }
}

