/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.Committed;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.ReadablePeriod;

public class AppenderatorImpl
implements Appenderator {
    public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
    public static final int ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER = 700;
    public static final int ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER = 600;
    public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
    public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
    private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
    private static final int WARN_DELAY = 1000;
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final String myId;
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final Cache cache;
    private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<SegmentIdWithShardSpec, Sink>();
    private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final long maxBytesTuningConfig;
    private final boolean skipBytesInMemoryOverheadCheck;
    private final QuerySegmentWalker texasRanger;
    private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
    private final AtomicInteger totalRows = new AtomicInteger();
    private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
    private final RowIngestionMeters rowIngestionMeters;
    private final ParseExceptionHandler parseExceptionHandler;
    private final Lock commitLock = new ReentrantLock();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private volatile ListeningExecutorService intermediateTempExecutor = null;
    private volatile long nextFlush;
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;
    private volatile Throwable persistError;
    private final boolean isOpenSegments;
    private final boolean useMaxMemoryEstimates;
    private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata = Collections.synchronizedMap(new IdentityHashMap());

    AppenderatorImpl(String id, DataSchema schema, AppenderatorConfig tuningConfig, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, DataSegmentAnnouncer segmentAnnouncer, @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker, IndexIO indexIO, IndexMerger indexMerger, Cache cache, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler, boolean isOpenSegments, boolean useMaxMemoryEstimates) {
        this.myId = id;
        this.schema = (DataSchema)Preconditions.checkNotNull((Object)schema, (Object)"schema");
        this.tuningConfig = (AppenderatorConfig)Preconditions.checkNotNull((Object)tuningConfig, (Object)"tuningConfig");
        this.metrics = (FireDepartmentMetrics)Preconditions.checkNotNull((Object)metrics, (Object)"metrics");
        this.dataSegmentPusher = (DataSegmentPusher)Preconditions.checkNotNull((Object)dataSegmentPusher, (Object)"dataSegmentPusher");
        this.objectMapper = (ObjectMapper)Preconditions.checkNotNull((Object)objectMapper, (Object)"objectMapper");
        this.segmentAnnouncer = (DataSegmentAnnouncer)Preconditions.checkNotNull((Object)segmentAnnouncer, (Object)"segmentAnnouncer");
        this.indexIO = (IndexIO)Preconditions.checkNotNull((Object)indexIO, (Object)"indexIO");
        this.indexMerger = (IndexMerger)Preconditions.checkNotNull((Object)indexMerger, (Object)"indexMerger");
        this.cache = cache;
        this.texasRanger = sinkQuerySegmentWalker;
        this.rowIngestionMeters = (RowIngestionMeters)Preconditions.checkNotNull((Object)rowIngestionMeters, (Object)"rowIngestionMeters");
        this.parseExceptionHandler = (ParseExceptionHandler)Preconditions.checkNotNull((Object)parseExceptionHandler, (Object)"parseExceptionHandler");
        this.isOpenSegments = isOpenSegments;
        this.useMaxMemoryEstimates = useMaxMemoryEstimates;
        this.sinkTimeline = sinkQuerySegmentWalker == null ? new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER) : sinkQuerySegmentWalker.getSinkTimeline();
        this.maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
        this.skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
        if (isOpenSegments) {
            log.debug("Running open segments appenderator", new Object[0]);
        } else {
            log.debug("Running closed segments appenderator", new Object[0]);
        }
    }

    @Override
    public String getId() {
        return this.myId;
    }

    @Override
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override
    public Object startJob() {
        this.lockBasePersistDirectory();
        Object retVal = this.bootstrapSinksFromDisk();
        this.initializeExecutors();
        this.resetNextFlush();
        return retVal;
    }

    private void throwPersistErrorIfExists() {
        if (this.persistError != null) {
            throw new RE(this.persistError, "Error while persisting", new Object[0]);
        }
    }

    @Override
    public Appenderator.AppenderatorAddResult add(SegmentIdWithShardSpec identifier, InputRow row, @Nullable Supplier<Committer> committerSupplier, boolean allowIncrementalPersists) throws IndexSizeExceededException, SegmentNotWritableException {
        long bytesInMemoryAfterAdd;
        int sinkRowsInMemoryAfterAdd;
        IncrementalIndexAddResult addResult;
        this.throwPersistErrorIfExists();
        if (!identifier.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", new Object[]{this.schema.getDataSource(), identifier.getDataSource()});
        }
        Sink sink = this.getOrCreateSink(identifier);
        this.metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
        int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
        long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
        try {
            addResult = sink.add(row, !allowIncrementalPersists);
            sinkRowsInMemoryAfterAdd = addResult.getRowCount();
            bytesInMemoryAfterAdd = addResult.getBytesInMemory();
        }
        catch (IndexSizeExceededException e) {
            log.error((Throwable)e, "Sink for segment[%s] was unexpectedly full!", new Object[]{identifier});
            throw e;
        }
        if (sinkRowsInMemoryAfterAdd < 0) {
            throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
        }
        if (addResult.isRowAdded()) {
            this.rowIngestionMeters.incrementProcessed();
        } else if (addResult.hasParseException()) {
            this.parseExceptionHandler.handle(addResult.getParseException());
        }
        int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
        this.rowsCurrentlyInMemory.addAndGet(numAddedRows);
        this.bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd);
        this.totalRows.addAndGet(numAddedRows);
        boolean isPersistRequired = false;
        boolean persist = false;
        ArrayList<String> persistReasons = new ArrayList<String>();
        if (!sink.canAppendRow()) {
            persist = true;
            persistReasons.add("No more rows can be appended to sink");
        }
        if (System.currentTimeMillis() > this.nextFlush) {
            persist = true;
            persistReasons.add(StringUtils.format((String)"current time[%d] is greater than nextFlush[%d]", (Object[])new Object[]{System.currentTimeMillis(), this.nextFlush}));
        }
        if (this.rowsCurrentlyInMemory.get() >= this.tuningConfig.getMaxRowsInMemory()) {
            persist = true;
            persistReasons.add(StringUtils.format((String)"rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", (Object[])new Object[]{this.rowsCurrentlyInMemory.get(), this.tuningConfig.getMaxRowsInMemory()}));
        }
        if (this.bytesCurrentlyInMemory.get() >= this.maxBytesTuningConfig) {
            persist = true;
            persistReasons.add(StringUtils.format((String)"(estimated) bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", (Object[])new Object[]{this.bytesCurrentlyInMemory.get(), this.maxBytesTuningConfig}));
        }
        if (persist) {
            if (allowIncrementalPersists) {
                log.info("Flushing in-memory data to disk because %s.", new Object[]{String.join((CharSequence)",", persistReasons)});
                long bytesToBePersisted = 0L;
                for (Map.Entry entry : this.sinks.entrySet()) {
                    Sink sinkEntry = (Sink)entry.getValue();
                    if (sinkEntry == null) continue;
                    bytesToBePersisted += sinkEntry.getBytesInMemory();
                    if (!sinkEntry.swappable()) continue;
                    int memoryStillInUse = this.calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
                    this.bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
                }
                if (!this.skipBytesInMemoryOverheadCheck && this.bytesCurrentlyInMemory.get() - bytesToBePersisted > this.maxBytesTuningConfig) {
                    String alertMessage = StringUtils.format((String)"Task has exceeded safe estimated heap usage limits, failing (numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", (Object[])new Object[]{this.sinks.size(), this.sinks.values().stream().mapToInt(Iterables::size).sum(), this.getTotalRowCount(), this.bytesCurrentlyInMemory.get(), bytesToBePersisted, this.maxBytesTuningConfig});
                    String errorMessage = StringUtils.format((String)"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to great to have enough space to process additional input rows. This check, along with metering the overhead of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting 'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an increase in heap footprint, but will allow for more intermediary segment persists to occur before reaching this condition.", (Object[])new Object[]{alertMessage});
                    log.makeAlert(alertMessage, new Object[0]).addData("dataSource", (Object)this.schema.getDataSource()).emit();
                    throw new RuntimeException(errorMessage);
                }
                Futures.addCallback(this.persistAll(committerSupplier == null ? null : (Committer)committerSupplier.get()), (FutureCallback)new FutureCallback<Object>(){

                    public void onSuccess(@Nullable Object result) {
                    }

                    public void onFailure(Throwable t) {
                        AppenderatorImpl.this.persistError = t;
                    }
                }, (Executor)MoreExecutors.directExecutor());
            } else {
                isPersistRequired = true;
            }
        }
        return new Appenderator.AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
    }

    @Override
    public List<SegmentIdWithShardSpec> getSegments() {
        return ImmutableList.copyOf(this.sinks.keySet());
    }

    @Override
    public int getRowCount(SegmentIdWithShardSpec identifier) {
        Sink sink = (Sink)this.sinks.get(identifier);
        if (sink == null) {
            throw new ISE("No such sink: %s", new Object[]{identifier});
        }
        return sink.getNumRows();
    }

    @Override
    public int getTotalRowCount() {
        return this.totalRows.get();
    }

    @VisibleForTesting
    int getRowsInMemory() {
        return this.rowsCurrentlyInMemory.get();
    }

    @VisibleForTesting
    long getBytesCurrentlyInMemory() {
        return this.bytesCurrentlyInMemory.get();
    }

    @VisibleForTesting
    long getBytesInMemory(SegmentIdWithShardSpec identifier) {
        Sink sink = (Sink)this.sinks.get(identifier);
        if (sink == null) {
            throw new ISE("No such sink: %s", new Object[]{identifier});
        }
        return sink.getBytesInMemory();
    }

    private Sink getOrCreateSink(SegmentIdWithShardSpec identifier) {
        Sink retVal = (Sink)this.sinks.get(identifier);
        if (retVal == null) {
            retVal = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, this.useMaxMemoryEstimates, null);
            this.bytesCurrentlyInMemory.addAndGet(this.calculateSinkMemoryInUsed());
            try {
                this.segmentAnnouncer.announceSegment(retVal.getSegment());
            }
            catch (IOException e) {
                log.makeAlert((Throwable)e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", (Object)retVal.getInterval()).emit();
            }
            this.sinks.put(identifier, retVal);
            this.metrics.setSinkCount(this.sinks.size());
            this.sinkTimeline.add(retVal.getInterval(), (Object)retVal.getVersion(), identifier.getShardSpec().createChunk((Object)retVal));
        }
        return retVal;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForIntervals(query, intervals);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        if (this.texasRanger == null) {
            throw new IllegalStateException("Don't query me, bro.");
        }
        return this.texasRanger.getQueryRunnerForSegments(query, specs);
    }

    @Override
    public void clear() throws InterruptedException {
        try {
            this.throwPersistErrorIfExists();
            if (this.persistExecutor != null) {
                ListenableFuture uncommitFuture = this.persistExecutor.submit(() -> {
                    try {
                        this.commitLock.lock();
                        this.objectMapper.writeValue(this.computeCommitFile(), (Object)Committed.nil());
                    }
                    finally {
                        this.commitLock.unlock();
                    }
                    return null;
                });
                uncommitFuture.get();
                ArrayList futures = new ArrayList();
                for (Map.Entry entry : this.sinks.entrySet()) {
                    futures.add(this.abandonSegment((SegmentIdWithShardSpec)entry.getKey(), (Sink)entry.getValue(), true));
                }
                this.persistedHydrantMetadata.clear();
                Futures.allAsList(futures).get();
            }
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public ListenableFuture<?> drop(SegmentIdWithShardSpec identifier) {
        Sink sink = (Sink)this.sinks.get(identifier);
        if (sink != null) {
            return this.abandonSegment(identifier, sink, true);
        }
        return Futures.immediateFuture(null);
    }

    @Override
    public ListenableFuture<Object> persistAll(final @Nullable Committer committer) {
        this.throwPersistErrorIfExists();
        final HashMap<String, Integer> currentHydrants = new HashMap<String, Integer>();
        final ArrayList<Pair> indexesToPersist = new ArrayList<Pair>();
        int numPersistedRows = 0;
        long bytesPersisted = 0L;
        final MutableLong totalHydrantsCount = new MutableLong();
        final MutableLong totalHydrantsPersisted = new MutableLong();
        final long totalSinks = this.sinks.size();
        for (Map.Entry entry : this.sinks.entrySet()) {
            SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)entry.getKey();
            Sink sink = (Sink)entry.getValue();
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{identifier});
            }
            ArrayList hydrants = Lists.newArrayList((Iterable)sink);
            totalHydrantsCount.add((long)hydrants.size());
            currentHydrants.put(identifier.toString(), hydrants.size());
            numPersistedRows += sink.getNumRowsInMemory();
            bytesPersisted += sink.getBytesInMemory();
            int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
            for (FireHydrant hydrant : hydrants.subList(0, limit)) {
                if (hydrant.hasSwapped()) continue;
                log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", new Object[]{hydrant, identifier});
                indexesToPersist.add(Pair.of((Object)hydrant, (Object)identifier));
                totalHydrantsPersisted.add(1L);
            }
            if (!sink.swappable()) continue;
            indexesToPersist.add(Pair.of((Object)sink.swap(), (Object)identifier));
            totalHydrantsPersisted.add(1L);
        }
        log.debug("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        final Object commitMetadata = committer == null ? null : committer.getMetadata();
        Stopwatch runExecStopwatch = Stopwatch.createStarted();
        final Stopwatch persistStopwatch = Stopwatch.createStarted();
        final AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
        ListenableFuture future = this.persistExecutor.submit((Callable)new Callable<Object>(){

            @Override
            public Object call() throws IOException {
                try {
                    Object commitHydrants;
                    for (Pair pair : indexesToPersist) {
                        AppenderatorImpl.this.metrics.incrementRowOutputCount(AppenderatorImpl.this.persistHydrant((FireHydrant)pair.lhs, (SegmentIdWithShardSpec)pair.rhs));
                    }
                    if (committer != null) {
                        log.debug("Committing metadata[%s] for sinks[%s].", new Object[]{commitMetadata, Joiner.on((String)", ").join((Iterable)currentHydrants.entrySet().stream().map(entry -> StringUtils.format((String)"%s:%d", (Object[])new Object[]{entry.getKey(), entry.getValue()})).collect(Collectors.toList()))});
                        committer.run();
                        try {
                            AppenderatorImpl.this.commitLock.lock();
                            commitHydrants = new HashMap<String, Integer>();
                            Committed oldCommit = AppenderatorImpl.this.readCommit();
                            if (oldCommit != null) {
                                commitHydrants.putAll(oldCommit.getHydrants());
                            }
                            commitHydrants.putAll(currentHydrants);
                            AppenderatorImpl.this.writeCommit(new Committed((Map<String, Integer>)commitHydrants, commitMetadata));
                        }
                        finally {
                            AppenderatorImpl.this.commitLock.unlock();
                        }
                    }
                    log.info("Flushed in-memory data with commit metadata [%s] for segments: %s", new Object[]{commitMetadata, indexesToPersist.stream().map(itp -> ((SegmentIdWithShardSpec)itp.rhs).asSegmentId().toString()).distinct().collect(Collectors.joining(", "))});
                    log.info("Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", new Object[]{AppenderatorImpl.this.rowIngestionMeters.getProcessed(), totalPersistedRows.get(), totalSinks, totalHydrantsCount.longValue(), totalHydrantsPersisted.longValue()});
                    commitHydrants = commitMetadata;
                    return commitHydrants;
                }
                catch (IOException e) {
                    AppenderatorImpl.this.metrics.incrementFailedPersists();
                    throw e;
                }
                finally {
                    AppenderatorImpl.this.metrics.incrementNumPersists();
                    AppenderatorImpl.this.metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
                    persistStopwatch.stop();
                }
            }
        });
        long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(startDelay);
        if (startDelay > 1000L) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{startDelay});
        }
        runExecStopwatch.stop();
        this.resetNextFlush();
        this.rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
        this.bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
        log.info("Persisted rows[%,d] and (estimated) bytes[%,d]", new Object[]{numPersistedRows, bytesPersisted});
        return future;
    }

    @Override
    public ListenableFuture<SegmentsAndCommitMetadata> push(Collection<SegmentIdWithShardSpec> identifiers, @Nullable Committer committer, boolean useUniquePath) {
        HashMap<SegmentIdWithShardSpec, Sink> theSinks = new HashMap<SegmentIdWithShardSpec, Sink>();
        AtomicLong pushedHydrantsCount = new AtomicLong();
        for (SegmentIdWithShardSpec identifier : identifiers) {
            Sink sink = (Sink)this.sinks.get(identifier);
            if (sink == null) {
                throw new ISE("No sink for identifier: %s", new Object[]{identifier});
            }
            theSinks.put(identifier, sink);
            if (sink.finishWriting()) {
                this.totalRows.addAndGet(-sink.getNumRows());
            }
            pushedHydrantsCount.addAndGet(Iterables.size((Iterable)sink));
        }
        return Futures.transform(this.persistAll(committer), commitMetadata -> {
            ArrayList<DataSegment> dataSegments = new ArrayList<DataSegment>();
            log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", new Object[]{this.rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get()});
            log.debug("Building and pushing segments: %s", new Object[]{theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", "))});
            for (Map.Entry entry : theSinks.entrySet()) {
                if (this.droppingSinks.contains(entry.getKey())) {
                    log.warn("Skipping push of currently-dropping sink[%s]", new Object[]{entry.getKey()});
                    continue;
                }
                DataSegment dataSegment = this.mergeAndPush((SegmentIdWithShardSpec)entry.getKey(), (Sink)entry.getValue(), useUniquePath);
                if (dataSegment != null) {
                    dataSegments.add(dataSegment);
                    continue;
                }
                log.warn("mergeAndPush[%s] returned null, skipping.", new Object[]{entry.getKey()});
            }
            log.info("Push complete...", new Object[0]);
            return new SegmentsAndCommitMetadata(dataSegments, commitMetadata);
        }, (Executor)this.pushExecutor);
    }

    private ListenableFuture<?> pushBarrier() {
        return this.intermediateTempExecutor.submit(() -> this.pushExecutor.submit(() -> {}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private DataSegment mergeAndPush(SegmentIdWithShardSpec identifier, Sink sink, boolean useUniquePath) {
        if (this.sinks.get(identifier) != sink) {
            log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", new Object[]{identifier});
            return null;
        }
        File persistDir = this.computePersistDir(identifier);
        File mergedTarget = new File(persistDir, "merged");
        File descriptorFile = this.computeDescriptorFile(identifier);
        for (FireHydrant hydrant : sink) {
            if (sink.isWritable()) {
                throw new ISE("Expected sink to be no longer writable before mergeAndPush for segment[%s].", new Object[]{identifier});
            }
            FireHydrant fireHydrant = hydrant;
            synchronized (fireHydrant) {
                if (!hydrant.hasSwapped()) {
                    throw new ISE("Expected sink to be fully persisted before mergeAndPush for segment[%s].", new Object[]{identifier});
                }
            }
        }
        try {
            long mergeFinishTime;
            File mergedFile;
            if (descriptorFile.exists()) {
                if (useUniquePath) {
                    log.debug("Segment[%s] already pushed, but we want a unique path, so will push again with a new path.", new Object[]{identifier});
                } else {
                    log.info("Segment[%s] already pushed, skipping.", new Object[]{identifier});
                    return (DataSegment)this.objectMapper.readValue(descriptorFile, DataSegment.class);
                }
            }
            this.removeDirectory(mergedTarget);
            if (mergedTarget.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", new Object[]{mergedTarget});
            }
            long startTime = System.nanoTime();
            ArrayList<QueryableIndex> indexes = new ArrayList<QueryableIndex>();
            try (Closer closer = Closer.create();){
                for (FireHydrant fireHydrant : sink) {
                    if (!this.isOpenSegments()) {
                        Pair<File, SegmentId> persistedMetadata = this.persistedHydrantMetadata.get(fireHydrant);
                        if (persistedMetadata == null) {
                            throw new ISE("Persisted metadata for batch hydrant [%s] is null!", new Object[]{fireHydrant});
                        }
                        File persistedFile = (File)persistedMetadata.lhs;
                        SegmentId persistedSegmentId = (SegmentId)persistedMetadata.rhs;
                        if (persistedFile == null) {
                            throw new ISE("Persisted file for batch hydrant [%s] is null!", new Object[]{fireHydrant});
                        }
                        if (persistedSegmentId == null) {
                            throw new ISE("Persisted segmentId for batch hydrant in file [%s] is null!", new Object[]{persistedFile.getPath()});
                        }
                        fireHydrant.swapSegment((Segment)new QueryableIndexSegment(this.indexIO.loadIndex(persistedFile), persistedSegmentId));
                    }
                    Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
                    QueryableIndex queryableIndex = ((ReferenceCountingSegment)segmentAndCloseable.lhs).asQueryableIndex();
                    log.debug("Segment[%s] adding hydrant[%s]", new Object[]{identifier, fireHydrant});
                    indexes.add(queryableIndex);
                    closer.register((Closeable)segmentAndCloseable.rhs);
                }
                mergedFile = this.indexMerger.mergeQueryableIndex(indexes, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), this.schema.getDimensionsSpec(), mergedTarget, this.tuningConfig.getIndexSpec(), this.tuningConfig.getIndexSpecForIntermediatePersists(), (ProgressIndicator)new BaseProgressIndicator(), this.tuningConfig.getSegmentWriteOutMediumFactory(), this.tuningConfig.getMaxColumnsToMerge());
                mergeFinishTime = System.nanoTime();
                log.debug("Segment[%s] built in %,dms.", new Object[]{identifier, (mergeFinishTime - startTime) / 1000000L});
            }
            DataSegment segmentToPush = sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, (DimensionsSpec)this.schema.getDimensionsSpec()));
            DataSegment segment = (DataSegment)RetryUtils.retry(() -> this.dataSegmentPusher.push(mergedFile, segmentToPush, useUniquePath), exception -> exception instanceof Exception, (int)5);
            if (!this.isOpenSegments()) {
                for (FireHydrant fireHydrant : sink) {
                    fireHydrant.swapSegment(null);
                }
            }
            long pushFinishTime = System.nanoTime();
            this.objectMapper.writeValue(descriptorFile, (Object)segment);
            log.info("Segment[%s] of %,d bytes built from %d incremental persist(s) in %,dms; pushed to deep storage in %,dms. Load spec is: %s", new Object[]{identifier, segment.getSize(), indexes.size(), (mergeFinishTime - startTime) / 1000000L, (pushFinishTime - mergeFinishTime) / 1000000L, this.objectMapper.writeValueAsString((Object)segment.getLoadSpec())});
            return segment;
        }
        catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn((Throwable)e, "Failed to push merged index for segment[%s].", new Object[]{identifier});
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping close() call.", new Object[0]);
            return;
        }
        log.debug("Shutting down...", new Object[0]);
        ArrayList futures = new ArrayList();
        for (Map.Entry entry : this.sinks.entrySet()) {
            futures.add(this.abandonSegment((SegmentIdWithShardSpec)entry.getKey(), (Sink)entry.getValue(), false));
        }
        try {
            Futures.allAsList(futures).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn((Throwable)e, "Interrupted during close()", new Object[0]);
        }
        catch (ExecutionException e) {
            log.warn((Throwable)e, "Unable to abandon existing segments during close()", new Object[0]);
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState((this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"persistExecutor not terminated");
            Preconditions.checkState((this.pushExecutor == null || this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"pushExecutor not terminated");
            Preconditions.checkState((this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.pushExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
        this.unlockBasePersistDirectory();
    }

    @Override
    public void closeNow() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping closeNow() call.", new Object[0]);
            return;
        }
        log.debug("Shutting down immediately...", new Object[0]);
        for (Map.Entry entry : this.sinks.entrySet()) {
            try {
                this.segmentAnnouncer.unannounceSegment(((Sink)entry.getValue()).getSegment());
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Failed to unannounce segment[%s]", new Object[]{this.schema.getDataSource()}).addData("identifier", (Object)((SegmentIdWithShardSpec)entry.getKey()).toString()).emit();
            }
        }
        try {
            this.shutdownExecutors();
            Preconditions.checkState((this.persistExecutor == null || this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"persistExecutor not terminated");
            Preconditions.checkState((this.intermediateTempExecutor == null || this.intermediateTempExecutor.awaitTermination(365L, TimeUnit.DAYS) ? 1 : 0) != 0, (Object)"intermediateTempExecutor not terminated");
            this.persistExecutor = null;
            this.intermediateTempExecutor = null;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to shutdown executors during close()", new Object[0]);
        }
    }

    public boolean isOpenSegments() {
        return this.isOpenSegments;
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                FileUtils.mkdirp((File)this.tuningConfig.getBasePersistDirectory());
                this.basePersistDirLockChannel = FileChannel.open(this.computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", new Object[]{this.computeLockFile()});
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initializeExecutors() {
        int maxPendingPersists = this.tuningConfig.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)("[" + StringUtils.encodeForFormat((String)this.myId) + "]-appenderator-persist"), (int)maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)("[" + StringUtils.encodeForFormat((String)this.myId) + "]-appenderator-merge"), (int)1));
        }
        if (this.intermediateTempExecutor == null) {
            this.intermediateTempExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.newBlockingSingleThreaded((String)("[" + StringUtils.encodeForFormat((String)this.myId) + "]-appenderator-abandon"), (int)0));
        }
    }

    private void shutdownExecutors() {
        if (this.persistExecutor != null) {
            this.persistExecutor.shutdownNow();
        }
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdownNow();
        }
        if (this.intermediateTempExecutor != null) {
            this.intermediateTempExecutor.shutdownNow();
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus((ReadablePeriod)this.tuningConfig.getIntermediatePersistPeriod()).getMillis();
    }

    private Object bootstrapSinksFromDisk() {
        Committed committed;
        Preconditions.checkState((boolean)this.sinks.isEmpty(), (Object)"Already bootstrapped?!");
        File baseDir = this.tuningConfig.getBasePersistDirectory();
        if (!baseDir.exists()) {
            return null;
        }
        File[] files = baseDir.listFiles();
        if (files == null) {
            return null;
        }
        File commitFile = null;
        try {
            this.commitLock.lock();
            commitFile = this.computeCommitFile();
            committed = commitFile.exists() ? (Committed)this.objectMapper.readValue(commitFile, Committed.class) : Committed.nil();
        }
        catch (Exception e) {
            throw new ISE((Throwable)e, "Failed to read commitFile: %s", new Object[]{commitFile});
        }
        finally {
            this.commitLock.unlock();
        }
        int rowsSoFar = 0;
        if (committed.equals(Committed.nil())) {
            log.debug("No previously committed metadata.", new Object[0]);
        } else {
            log.info("Loading partially-persisted segments[%s] from[%s] with commit metadata: %s", new Object[]{String.join((CharSequence)", ", (Iterable<? extends CharSequence>)committed.getHydrants().keySet()), baseDir, committed.getMetadata()});
        }
        for (File sinkDir : files) {
            File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
            if (!identifierFile.isFile()) continue;
            try {
                SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)this.objectMapper.readValue(new File(sinkDir, IDENTIFIER_FILE_NAME), SegmentIdWithShardSpec.class);
                int committedHydrants = committed.getCommittedHydrants(identifier.toString());
                if (committedHydrants <= 0) {
                    log.info("Removing uncommitted segment at [%s].", new Object[]{sinkDir});
                    FileUtils.deleteDirectory((File)sinkDir);
                    continue;
                }
                File[] sinkFiles = sinkDir.listFiles((dir, fileName) -> Ints.tryParse((String)fileName) != null);
                Arrays.sort(sinkFiles, (o1, o2) -> Ints.compare((int)Integer.parseInt(o1.getName()), (int)Integer.parseInt(o2.getName())));
                ArrayList<FireHydrant> hydrants = new ArrayList<FireHydrant>();
                for (File hydrantDir : sinkFiles) {
                    int hydrantNumber = Integer.parseInt(hydrantDir.getName());
                    if (hydrantNumber >= committedHydrants) {
                        log.info("Removing uncommitted partial segment at [%s]", new Object[]{hydrantDir});
                        FileUtils.deleteDirectory((File)hydrantDir);
                        continue;
                    }
                    log.debug("Loading previously persisted partial segment at [%s]", new Object[]{hydrantDir});
                    if (hydrantNumber != hydrants.size()) {
                        throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{hydrants.size(), sinkDir});
                    }
                    hydrants.add(new FireHydrant((Segment)new QueryableIndexSegment(this.indexIO.loadIndex(hydrantDir), identifier.asSegmentId()), hydrantNumber));
                }
                if (committedHydrants != hydrants.size()) {
                    throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", new Object[]{hydrants.size(), sinkDir});
                }
                Sink currSink = new Sink(identifier.getInterval(), this.schema, identifier.getShardSpec(), identifier.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, this.useMaxMemoryEstimates, null, hydrants);
                rowsSoFar += currSink.getNumRows();
                this.sinks.put(identifier, currSink);
                this.sinkTimeline.add(currSink.getInterval(), (Object)currSink.getVersion(), identifier.getShardSpec().createChunk((Object)currSink));
                this.segmentAnnouncer.announceSegment(currSink.getSegment());
            }
            catch (IOException e) {
                log.makeAlert((Throwable)e, "Problem loading sink[%s] from disk.", new Object[]{this.schema.getDataSource()}).addData("sinkDir", (Object)sinkDir).emit();
            }
        }
        HashSet loadedSinks = Sets.newHashSet((Iterable)Iterables.transform(this.sinks.keySet(), SegmentIdWithShardSpec::toString));
        Sets.SetView missingSinks = Sets.difference((Set)committed.getHydrants().keySet(), (Set)loadedSinks);
        if (!missingSinks.isEmpty()) {
            throw new ISE("Missing committed sinks [%s]", new Object[]{Joiner.on((String)", ").join((Iterable)missingSinks)});
        }
        this.totalRows.set(rowsSoFar);
        return committed.getMetadata();
    }

    private ListenableFuture<?> abandonSegment(final SegmentIdWithShardSpec identifier, final Sink sink, final boolean removeOnDiskData) {
        if (sink.finishWriting()) {
            this.rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
            this.bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
            this.bytesCurrentlyInMemory.addAndGet(-this.calculateSinkMemoryInUsed());
            for (FireHydrant hydrant : sink) {
                if (hydrant.equals(sink.getCurrHydrant())) continue;
                this.bytesCurrentlyInMemory.addAndGet(-this.calculateMMappedHydrantMemoryInUsed(hydrant));
            }
            this.totalRows.addAndGet(-sink.getNumRows());
        }
        this.droppingSinks.add(identifier);
        return Futures.transform(this.pushBarrier(), (Function)new Function<Object, Void>(){

            @Nullable
            public Void apply(@Nullable Object input) {
                if (!AppenderatorImpl.this.sinks.remove(identifier, sink)) {
                    log.error("Sink for segment[%s] no longer valid, not abandoning.", new Object[]{identifier});
                    return null;
                }
                AppenderatorImpl.this.metrics.setSinkCount(AppenderatorImpl.this.sinks.size());
                if (removeOnDiskData) {
                    log.debug("Removing commit metadata for segment[%s].", new Object[]{identifier});
                    try {
                        AppenderatorImpl.this.commitLock.lock();
                        Committed oldCommit = AppenderatorImpl.this.readCommit();
                        if (oldCommit != null) {
                            AppenderatorImpl.this.writeCommit(oldCommit.without(identifier.toString()));
                        }
                    }
                    catch (Exception e) {
                        log.makeAlert((Throwable)e, "Failed to update committed segments[%s]", new Object[]{AppenderatorImpl.this.schema.getDataSource()}).addData("identifier", (Object)identifier.toString()).emit();
                        throw new RuntimeException(e);
                    }
                    finally {
                        AppenderatorImpl.this.commitLock.unlock();
                    }
                }
                try {
                    AppenderatorImpl.this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                }
                catch (Exception e) {
                    log.makeAlert((Throwable)e, "Failed to unannounce segment[%s]", new Object[]{AppenderatorImpl.this.schema.getDataSource()}).addData("identifier", (Object)identifier.toString()).emit();
                }
                AppenderatorImpl.this.droppingSinks.remove(identifier);
                AppenderatorImpl.this.sinkTimeline.remove(sink.getInterval(), (Object)sink.getVersion(), identifier.getShardSpec().createChunk((Object)sink));
                for (FireHydrant hydrant : sink) {
                    if (AppenderatorImpl.this.cache != null) {
                        AppenderatorImpl.this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
                    }
                    hydrant.swapSegment(null);
                    AppenderatorImpl.this.persistedHydrantMetadata.remove(hydrant);
                }
                if (removeOnDiskData) {
                    AppenderatorImpl.this.removeDirectory(AppenderatorImpl.this.computePersistDir(identifier));
                }
                log.info("Dropped segment[%s].", new Object[]{identifier});
                return null;
            }
        }, (Executor)this.persistExecutor);
    }

    private Committed readCommit() throws IOException {
        File commitFile = this.computeCommitFile();
        if (commitFile.exists()) {
            return (Committed)this.objectMapper.readValue(commitFile, Committed.class);
        }
        return null;
    }

    private void writeCommit(Committed newCommit) throws IOException {
        File commitFile = this.computeCommitFile();
        this.objectMapper.writeValue(commitFile, (Object)newCommit);
    }

    private File computeCommitFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), "commit.json");
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    private File computePersistDir(SegmentIdWithShardSpec identifier) {
        return new File(this.tuningConfig.getBasePersistDirectory(), identifier.toString());
    }

    private File computeIdentifierFile(SegmentIdWithShardSpec identifier) {
        return new File(this.computePersistDir(identifier), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdWithShardSpec identifier) {
        return new File(this.computePersistDir(identifier), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) throws IOException {
        File persistDir = this.computePersistDir(identifier);
        FileUtils.mkdirp((File)persistDir);
        this.objectMapper.writeValue(this.computeIdentifierFile(identifier), (Object)identifier);
        return persistDir;
    }

    private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier) {
        FireHydrant fireHydrant = indexToPersist;
        synchronized (fireHydrant) {
            if (indexToPersist.hasSwapped()) {
                log.info("Segment[%s] hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{identifier, indexToPersist});
                return 0;
            }
            log.debug("Segment[%s], persisting Hydrant[%s]", new Object[]{identifier, indexToPersist});
            try {
                long startTime = System.nanoTime();
                int numRows = indexToPersist.getIndex().size();
                File persistDir = this.createPersistDirIfNeeded(identifier);
                File persistedFile = this.indexMerger.persist(indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), this.tuningConfig.getIndexSpecForIntermediatePersists(), this.tuningConfig.getSegmentWriteOutMediumFactory());
                log.info("Flushed in-memory data for segment[%s] spill[%s] to disk in [%,d] ms (%,d rows).", new Object[]{indexToPersist.getSegmentId(), indexToPersist.getCount(), (System.nanoTime() - startTime) / 1000000L, numRows});
                QueryableIndexSegment segmentToSwap = null;
                if (this.isOpenSegments()) {
                    segmentToSwap = new QueryableIndexSegment(this.indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId());
                } else {
                    this.persistedHydrantMetadata.put(indexToPersist, (Pair<File, SegmentId>)new Pair((Object)persistedFile, (Object)indexToPersist.getSegmentId()));
                }
                indexToPersist.swapSegment((Segment)segmentToSwap);
                return numRows;
            }
            catch (IOException e) {
                log.makeAlert("Incremental persist failed", new Object[0]).addData("segment", (Object)identifier.toString()).addData("dataSource", (Object)this.schema.getDataSource()).addData("count", (Object)indexToPersist.getCount()).emit();
                throw new RuntimeException(e);
            }
        }
    }

    private void removeDirectory(File target) {
        if (target.exists()) {
            try {
                FileUtils.deleteDirectory((File)target);
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Failed to remove directory[%s]", new Object[]{this.schema.getDataSource()}).addData("file", (Object)target).emit();
            }
        }
    }

    private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant) {
        if (this.skipBytesInMemoryOverheadCheck) {
            return 0;
        }
        int total = 1012;
        if (this.isOpenSegments()) {
            total += hydrant.getSegmentNumDimensionColumns() * 1000 + hydrant.getSegmentNumMetricColumns() * 700 + 600;
        }
        return total;
    }

    private int calculateSinkMemoryInUsed() {
        if (this.skipBytesInMemoryOverheadCheck) {
            return 0;
        }
        return 5000;
    }
}

