/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.UnknownSegmentIdsException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadableInterval;
import org.skife.jdbi.v2.BaseResultSetMapper;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

@ManageLifecycle
public class SqlSegmentsMetadataManager
implements SegmentsMetadataManager {
    private static final EmittingLogger log = new EmittingLogger(SqlSegmentsMetadataManager.class);
    private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
    private final Object pollLock = new Object();
    private final ObjectMapper jsonMapper;
    private final Duration periodicPollDelay;
    private final Supplier<MetadataStorageTablesConfig> dbTables;
    private final SQLMetadataConnector connector;
    private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null;
    @Nullable
    private volatile DatabasePoll latestDatabasePoll = null;
    @Nullable
    @GuardedBy(value="startStopPollLock")
    private Future<?> periodicPollTaskFuture = null;
    @GuardedBy(value="startStopPollLock")
    private long startPollingCount = 0L;
    @GuardedBy(value="startStopPollLock")
    private long currentStartPollingOrder = -1L;
    @Nullable
    @GuardedBy(value="startStopPollLock")
    private ScheduledExecutorService exec = null;

    @Inject
    public SqlSegmentsMetadataManager(ObjectMapper jsonMapper, Supplier<SegmentsMetadataManagerConfig> config, Supplier<MetadataStorageTablesConfig> dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.periodicPollDelay = ((SegmentsMetadataManagerConfig)config.get()).getPollDuration().toStandardDuration();
        this.dbTables = dbTables;
        this.connector = connector;
    }

    @LifecycleStart
    public void start() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.exec != null) {
                return;
            }
            this.exec = Execs.scheduledSingleThreaded((String)(StringUtils.encodeForFormat((String)this.getClass().getName()) + "-Exec--%d"));
        }
        finally {
            lock.unlock();
        }
    }

    @LifecycleStop
    public void stop() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            this.exec.shutdownNow();
            this.exec = null;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startPollingDatabasePeriodically() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.exec == null) {
                throw new IllegalStateException(this.getClass().getName() + " is not started");
            }
            if (this.isPollingDatabasePeriodically()) {
                return;
            }
            PeriodicDatabasePoll periodicDatabasePoll = new PeriodicDatabasePoll();
            this.latestDatabasePoll = periodicDatabasePoll;
            ++this.startPollingCount;
            long localStartOrder = this.currentStartPollingOrder = this.startPollingCount;
            this.periodicPollTaskFuture = this.exec.scheduleWithFixedDelay(this.createPollTaskForStartOrder(localStartOrder, periodicDatabasePoll), 0L, this.periodicPollDelay.getMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            lock.unlock();
        }
    }

    private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll) {
        return () -> {
            try {
                long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(this.periodicPollDelay.getMillis());
                while (this.latestDatabasePoll != null && this.latestDatabasePoll instanceof OnDemandDatabasePoll && ((OnDemandDatabasePoll)this.latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) {
                    long sleepNano = periodicPollDelayNanos - ((OnDemandDatabasePoll)this.latestDatabasePoll).nanosElapsedFromInitiation();
                    TimeUnit.NANOSECONDS.sleep(sleepNano);
                }
            }
            catch (Exception e) {
                log.debug((Throwable)e, "Exception found while waiting for next periodic poll", new Object[0]);
            }
            ReentrantReadWriteLock.ReadLock lock = this.startStopPollLock.readLock();
            lock.lock();
            try {
                if (startOrder == this.currentStartPollingOrder) {
                    periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis();
                    this.poll();
                    periodicDatabasePoll.firstPollCompletionFuture.complete(null);
                    this.latestDatabasePoll = periodicDatabasePoll;
                } else {
                    log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", new Object[]{startOrder});
                }
            }
            catch (Throwable t) {
                log.makeAlert(t, "Uncaught exception in %s's polling thread", new Object[]{SqlSegmentsMetadataManager.class}).emit();
                if (!(t instanceof Exception)) {
                    periodicDatabasePoll.firstPollCompletionFuture.completeExceptionally(t);
                    throw t;
                }
            }
            finally {
                lock.unlock();
            }
        };
    }

    @Override
    public boolean isPollingDatabasePeriodically() {
        ReentrantReadWriteLock.ReadLock lock = this.startStopPollLock.readLock();
        lock.lock();
        try {
            boolean bl = this.currentStartPollingOrder >= 0L;
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void stopPollingDatabasePeriodically() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (!this.isPollingDatabasePeriodically()) {
                return;
            }
            this.periodicPollTaskFuture.cancel(false);
            this.latestDatabasePoll = null;
            this.currentStartPollingOrder = -1L;
        }
        finally {
            lock.unlock();
        }
    }

    private void useLatestIfWithinDelayOrPerformNewDatabasePoll() {
        if (this.useLatestSnapshotIfWithinDelay()) {
            return;
        }
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.useLatestSnapshotIfWithinDelay()) {
                return;
            }
            OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
            this.latestDatabasePoll = onDemandDatabasePoll;
            this.doOnDemandPoll(onDemandDatabasePoll);
        }
        finally {
            lock.unlock();
        }
    }

    @VisibleForTesting
    boolean useLatestSnapshotIfWithinDelay() {
        DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
        if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
            Futures.getUnchecked(((PeriodicDatabasePoll)latestDatabasePoll).firstPollCompletionFuture);
            return true;
        }
        if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
            boolean latestDatabasePollIsFresh;
            long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(this.periodicPollDelay.getMillis());
            OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll)latestDatabasePoll;
            boolean bl = latestDatabasePollIsFresh = latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos;
            if (latestDatabasePollIsFresh) {
                Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture);
                return true;
            }
        } else assert (latestDatabasePoll == null);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    void forceOrWaitOngoingDatabasePoll() {
        block8: {
            long checkStartTime = System.currentTimeMillis();
            ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
            lock.lock();
            DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
            try {
                if (latestDatabasePoll instanceof PeriodicDatabasePoll && ((PeriodicDatabasePoll)latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) {
                    lock.unlock();
                    return;
                }
            }
            catch (Exception e) {
                log.debug((Throwable)e, "Latest poll was unsuccessful. Starting a new poll...", new Object[0]);
                break block8;
            }
            {
                if (!(latestDatabasePoll instanceof OnDemandDatabasePoll)) break block8;
                long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime);
                OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll)latestDatabasePoll;
                if (latestOnDemandPoll.initiationTimeNanos <= checkStartTimeNanos) break block8;
                lock.unlock();
                return;
            }
        }
        OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
        this.latestDatabasePoll = onDemandDatabasePoll;
        this.doOnDemandPoll(onDemandDatabasePoll);
    }

    private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) {
        try {
            this.poll();
            onDemandPoll.pollCompletionFuture.complete(null);
        }
        catch (Throwable t) {
            onDemandPoll.pollCompletionFuture.completeExceptionally(t);
            throw t;
        }
    }

    @Override
    public boolean markSegmentAsUsed(String segmentId) {
        try {
            int numUpdatedDatabaseEntries = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=true WHERE id = :id", (Object[])new Object[]{this.getSegmentsTable()})).bind("id", segmentId)).execute());
            return numUpdatedDatabaseEntries > 0;
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking segment %s as used", new Object[]{segmentId});
            throw e;
        }
    }

    @Override
    public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource) {
        return this.doMarkAsUsedNonOvershadowedSegments(dataSource, null);
    }

    @Override
    public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval) {
        Preconditions.checkNotNull((Object)interval);
        return this.doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
    }

    private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) {
        ArrayList usedSegmentsOverlappingInterval = new ArrayList();
        ArrayList<DataSegment> unusedSegmentsInInterval = new ArrayList<DataSegment>();
        this.connector.inReadOnlyTransaction((handle, status) -> {
            String queryString = StringUtils.format((String)"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource", (Object[])new Object[]{this.getSegmentsTable()});
            if (interval != null) {
                queryString = queryString + StringUtils.format((String)" AND start < :end AND %1$send%1$s > :start", (Object[])new Object[]{this.connector.getQuoteString()});
            }
            Query query = (Query)handle.createQuery(queryString).setFetchSize(this.connector.getStreamingFetchSize()).bind("dataSource", dataSourceName);
            if (interval != null) {
                query = (Query)((Query)query.bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString());
            }
            query = query.map((index, resultSet, context) -> {
                DataSegment segment = (DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])resultSet.getBytes("payload"), DataSegment.class);
                if (resultSet.getBoolean("used")) {
                    usedSegmentsOverlappingInterval.add(segment);
                } else if (interval == null || interval.contains((ReadableInterval)segment.getInterval())) {
                    unusedSegmentsInInterval.add(segment);
                }
                return null;
            });
            SqlSegmentsMetadataManager.consume(query.iterator());
            return null;
        });
        VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments((Iterator)Iterators.concat(usedSegmentsOverlappingInterval.iterator(), unusedSegmentsInInterval.iterator()));
        return this.markNonOvershadowedSegmentsAsUsed(unusedSegmentsInInterval, (VersionedIntervalTimeline<String, DataSegment>)versionedIntervalTimeline);
    }

    private static void consume(Iterator<?> iterator) {
        while (iterator.hasNext()) {
            iterator.next();
        }
    }

    private int markNonOvershadowedSegmentsAsUsed(List<DataSegment> unusedSegments, VersionedIntervalTimeline<String, DataSegment> timeline) {
        ArrayList<String> segmentIdsToMarkAsUsed = new ArrayList<String>();
        for (DataSegment segment : unusedSegments) {
            if (timeline.isOvershadowed(segment.getInterval(), (Object)segment.getVersion(), (Overshadowable)segment)) continue;
            segmentIdsToMarkAsUsed.add(segment.getId().toString());
        }
        return this.markSegmentsAsUsed(segmentIdsToMarkAsUsed);
    }

    @Override
    public int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds) throws UnknownSegmentIdsException {
        try {
            Pair unusedSegmentsAndTimeline = (Pair)this.connector.inReadOnlyTransaction((handle, status) -> {
                List<DataSegment> unusedSegments = this.retrieveUnusedSegments(dataSource, segmentIds, handle);
                List unusedSegmentsIntervals = JodaUtils.condenseIntervals((Iterable)unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
                Iterator<DataSegment> usedSegmentsOverlappingUnusedSegmentsIntervals = this.retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle);
                VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments((Iterator)Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator()));
                return new Pair(unusedSegments, (Object)timeline);
            });
            List unusedSegments = (List)unusedSegmentsAndTimeline.lhs;
            VersionedIntervalTimeline timeline = (VersionedIntervalTimeline)unusedSegmentsAndTimeline.rhs;
            return this.markNonOvershadowedSegmentsAsUsed(unusedSegments, (VersionedIntervalTimeline<String, DataSegment>)timeline);
        }
        catch (Exception e) {
            Throwable rootCause = Throwables.getRootCause((Throwable)e);
            if (rootCause instanceof UnknownSegmentIdsException) {
                throw (UnknownSegmentIdsException)rootCause;
            }
            throw e;
        }
    }

    private List<DataSegment> retrieveUnusedSegments(String dataSource, Set<String> segmentIds, Handle handle) throws UnknownSegmentIdsException {
        ArrayList<String> unknownSegmentIds = new ArrayList<String>();
        List<DataSegment> segments = segmentIds.stream().map(segmentId -> {
            ResultIterator segmentResultIterator = ((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id", (Object[])new Object[]{this.getSegmentsTable()})).bind("dataSource", dataSource)).bind("id", segmentId)).map((index, resultSet, context) -> {
                try {
                    if (!resultSet.getBoolean("used")) {
                        return (DataSegment)this.jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
                    }
                    return null;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).iterator();
            if (!segmentResultIterator.hasNext()) {
                unknownSegmentIds.add((String)segmentId);
                return null;
            }
            DataSegment segment = (DataSegment)segmentResultIterator.next();
            if (segmentResultIterator.hasNext()) {
                log.error("There is more than one row corresponding to segment id [%s] in data source [%s] in the database", new Object[]{segmentId, dataSource});
            }
            return segment;
        }).filter(Objects::nonNull).collect(Collectors.toList());
        if (!unknownSegmentIds.isEmpty()) {
            throw new UnknownSegmentIdsException(unknownSegmentIds);
        }
        return segments;
    }

    private Iterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(String dataSource, Collection<Interval> intervals, Handle handle) {
        return intervals.stream().flatMap(interval -> {
            Iterable segmentResultIterable = () -> ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true", (Object[])new Object[]{this.getSegmentsTable(), this.connector.getQuoteString()})).setFetchSize(this.connector.getStreamingFetchSize()).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((index, resultSet, context) -> (DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])resultSet.getBytes("payload"), DataSegment.class)).iterator();
            return StreamSupport.stream(segmentResultIterable.spliterator(), false);
        }).iterator();
    }

    private int markSegmentsAsUsed(List<String> segmentIds) {
        if (segmentIds.isEmpty()) {
            log.info("No segments found to update!", new Object[0]);
            return 0;
        }
        return (Integer)this.connector.getDBI().withHandle(handle -> {
            Batch batch = handle.createBatch();
            segmentIds.forEach(segmentId -> batch.add(StringUtils.format((String)"UPDATE %s SET used=true WHERE id = '%s'", (Object[])new Object[]{this.getSegmentsTable(), segmentId})));
            int[] segmentChanges = batch.execute();
            return SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIds, segmentChanges);
        });
    }

    @Override
    public int markAsUnusedAllSegmentsInDataSource(String dataSource) {
        try {
            int numUpdatedDatabaseEntries = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE dataSource = :dataSource", (Object[])new Object[]{this.getSegmentsTable()})).bind("dataSource", dataSource)).execute());
            return numUpdatedDatabaseEntries;
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking all segments as unused in data source [%s]", new Object[]{dataSource});
            throw e;
        }
    }

    @Override
    public boolean markSegmentAsUnused(String segmentId) {
        try {
            return this.markSegmentAsUnusedInDatabase(segmentId);
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking segment [%s] as unused", new Object[]{segmentId});
            throw e;
        }
    }

    @Override
    public int markSegmentsAsUnused(String dataSourceName, Set<String> segmentIds) {
        if (segmentIds.isEmpty()) {
            return 0;
        }
        ArrayList<String> segmentIdList = new ArrayList<String>(segmentIds);
        try {
            return (Integer)this.connector.getDBI().withHandle(handle -> {
                Batch batch = handle.createBatch();
                segmentIdList.forEach(segmentId -> batch.add(StringUtils.format((String)"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'", (Object[])new Object[]{this.getSegmentsTable(), dataSourceName, segmentId})));
                int[] segmentChanges = batch.execute();
                return SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIdList, segmentChanges);
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval) {
        try {
            Integer numUpdatedDatabaseEntries = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE datasource = :datasource AND start >= :start AND %2$send%2$s <= :end", (Object[])new Object[]{this.getSegmentsTable(), this.connector.getQuoteString()})).bind("datasource", dataSourceName)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).execute());
            return numUpdatedDatabaseEntries;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean markSegmentAsUnusedInDatabase(String segmentId) {
        int numUpdatedRows = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE id = :segmentID", (Object[])new Object[]{this.getSegmentsTable()})).bind("segmentID", segmentId)).execute());
        if (numUpdatedRows < 0) {
            log.assertionError("Negative number of rows updated for segment id [%s]: %d", new Object[]{segmentId, numUpdatedRows});
        } else if (numUpdatedRows > 1) {
            log.error("More than one row updated for segment id [%s]: %d, there may be more than one row for the segment id in the database", new Object[]{segmentId, numUpdatedRows});
        }
        return numUpdatedRows > 0;
    }

    static int computeNumChangedSegments(List<String> segmentIds, int[] segmentChanges) {
        int numChangedSegments = 0;
        for (int i = 0; i < segmentChanges.length; ++i) {
            int numUpdatedRows = segmentChanges[i];
            if (numUpdatedRows < 0) {
                log.assertionError("Negative number of rows updated for segment id [%s]: %d", new Object[]{segmentIds.get(i), numUpdatedRows});
            } else if (numUpdatedRows > 1) {
                log.error("More than one row updated for segment id [%s]: %d, there may be more than one row for the segment id in the database", new Object[]{segmentIds.get(i), numUpdatedRows});
            }
            if (numUpdatedRows <= 0) continue;
            ++numChangedSegments;
        }
        return numChangedSegments;
    }

    @Override
    @Nullable
    public ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSourceName) {
        return this.getSnapshotOfDataSourcesWithAllUsedSegments().getDataSource(dataSourceName);
    }

    @Override
    public Collection<ImmutableDruidDataSource> getImmutableDataSourcesWithAllUsedSegments() {
        return this.getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments();
    }

    @Override
    public Set<SegmentId> getOvershadowedSegments() {
        return this.getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments();
    }

    @Override
    public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() {
        this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        return this.dataSourcesSnapshot;
    }

    @VisibleForTesting
    DataSourcesSnapshot getDataSourcesSnapshot() {
        return this.dataSourcesSnapshot;
    }

    @VisibleForTesting
    DatabasePoll getLatestDatabasePoll() {
        return this.latestDatabasePoll;
    }

    @Override
    public Iterable<DataSegment> iterateAllUsedSegments() {
        this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        return this.dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot();
    }

    @Override
    public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, Interval interval, boolean requiresLatest) {
        if (requiresLatest) {
            this.forceOrWaitOngoingDatabasePoll();
        } else {
            this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        }
        VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline = this.dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource);
        return Optional.fromNullable(usedSegmentsTimeline).transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
    }

    @Override
    public Set<String> retrieveAllDataSourceNames() {
        return (Set)this.connector.getDBI().withHandle(handle -> (Set)handle.createQuery(StringUtils.format((String)"SELECT DISTINCT(datasource) FROM %s", (Object[])new Object[]{this.getSegmentsTable()})).fold(new HashSet(), (druidDataSources, stringObjectMap, foldController, statementContext) -> {
            druidDataSources.add(MapUtils.getString((Map)stringObjectMap, (String)"datasource"));
            return druidDataSources;
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void poll() {
        Object object = this.pollLock;
        synchronized (object) {
            this.doPoll();
        }
    }

    @GuardedBy(value="pollLock")
    private void doPoll() {
        log.debug("Starting polling of segment table", new Object[0]);
        List<DataSegment> segments = this.connector.inReadOnlyTransaction(new TransactionCallback<List<DataSegment>>(){

            public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) {
                return handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE used=true", (Object[])new Object[]{SqlSegmentsMetadataManager.this.getSegmentsTable()})).setFetchSize(SqlSegmentsMetadataManager.this.connector.getStreamingFetchSize()).map((ResultSetMapper)new ResultSetMapper<DataSegment>(){

                    public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException {
                        try {
                            DataSegment segment = (DataSegment)SqlSegmentsMetadataManager.this.jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
                            return SqlSegmentsMetadataManager.this.replaceWithExistingSegmentIfPresent(segment);
                        }
                        catch (IOException e) {
                            log.makeAlert((Throwable)e, "Failed to read segment from db.", new Object[0]).emit();
                            return null;
                        }
                    }
                }).list();
            }
        });
        Preconditions.checkNotNull(segments, (Object)"Unexpected 'null' when polling segments from the db, aborting snapshot update.");
        ImmutableMap<String, String> dataSourceProperties = SqlSegmentsMetadataManager.createDefaultDataSourceProperties();
        if (segments.isEmpty()) {
            log.info("No segments found in the database!", new Object[0]);
        } else {
            log.info("Polled and found %,d segments in the database", new Object[]{segments.size()});
        }
        this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(Iterables.filter(segments, Objects::nonNull), dataSourceProperties);
    }

    private static ImmutableMap<String, String> createDefaultDataSourceProperties() {
        return ImmutableMap.of((Object)"created", (Object)DateTimes.nowUtc().toString());
    }

    private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) {
        @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = this.dataSourcesSnapshot;
        if (dataSourcesSnapshot == null) {
            return segment;
        }
        ImmutableDruidDataSource dataSource = dataSourcesSnapshot.getDataSource(segment.getDataSource());
        if (dataSource == null) {
            return segment;
        }
        DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId());
        return alreadyExistingSegment != null ? alreadyExistingSegment : segment;
    }

    private String getSegmentsTable() {
        return ((MetadataStorageTablesConfig)this.dbTables.get()).getSegmentsTable();
    }

    @Override
    public List<Interval> getUnusedSegmentIntervals(final String dataSource, final DateTime maxEndTime, final int limit) {
        return this.connector.inReadOnlyTransaction(new TransactionCallback<List<Interval>>(){

            public List<Interval> inTransaction(Handle handle, TransactionStatus status) {
                ResultIterator iter = ((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND %2$send%2$s <= :end AND used = false ORDER BY start, %2$send%2$s", (Object[])new Object[]{SqlSegmentsMetadataManager.this.getSegmentsTable(), SqlSegmentsMetadataManager.this.connector.getQuoteString()})).setFetchSize(SqlSegmentsMetadataManager.this.connector.getStreamingFetchSize()).setMaxRows(limit).bind("dataSource", dataSource)).bind("end", maxEndTime.toString())).map((ResultSetMapper)new BaseResultSetMapper<Interval>(){

                    protected Interval mapInternal(int index, Map<String, Object> row) {
                        return new Interval((ReadableInstant)DateTimes.of((String)((String)row.get("start"))), (ReadableInstant)DateTimes.of((String)((String)row.get("end"))));
                    }
                }).iterator();
                ArrayList result = Lists.newArrayListWithCapacity((int)limit);
                for (int i = 0; i < limit && iter.hasNext(); ++i) {
                    try {
                        result.add(iter.next());
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return result;
            }
        });
    }

    @VisibleForTesting
    static class OnDemandDatabasePoll
    implements DatabasePoll {
        final long initiationTimeNanos = System.nanoTime();
        final CompletableFuture<Void> pollCompletionFuture = new CompletableFuture();

        OnDemandDatabasePoll() {
        }

        long nanosElapsedFromInitiation() {
            return System.nanoTime() - this.initiationTimeNanos;
        }
    }

    @VisibleForTesting
    static class PeriodicDatabasePoll
    implements DatabasePoll {
        final CompletableFuture<Void> firstPollCompletionFuture = new CompletableFuture();
        long lastPollStartTimestampInMs = -1L;

        PeriodicDatabasePoll() {
        }
    }

    private static interface DatabasePoll {
    }
}

