/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.metadata.RetryTransactionException;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.PreparedBatchPart;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;

public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStorageCoordinator {
    private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
    private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100;
    private static final String UPGRADED_PENDING_SEGMENT_PREFIX = "upgraded_to_version__";
    private final ObjectMapper jsonMapper;
    private final MetadataStorageTablesConfig dbTables;
    private final SQLMetadataConnector connector;

    @Inject
    public IndexerSQLMetadataStorageCoordinator(ObjectMapper jsonMapper, MetadataStorageTablesConfig dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.dbTables = dbTables;
        this.connector = connector;
    }

    @LifecycleStart
    public void start() {
        this.connector.createDataSourceTable();
        this.connector.createPendingSegmentsTable();
        this.connector.createSegmentTable();
        this.connector.createUpgradeSegmentsTable();
    }

    @Override
    public Collection<DataSegment> retrieveUsedSegmentsForIntervals(String dataSource, List<Interval> intervals, Segments visibility) {
        if (intervals == null || intervals.isEmpty()) {
            throw new IAE("null/empty intervals", new Object[0]);
        }
        return this.doRetrieveUsedSegments(dataSource, intervals, visibility);
    }

    @Override
    public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments visibility) {
        return this.doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility);
    }

    private Collection<DataSegment> doRetrieveUsedSegments(String dataSource, List<Interval> intervals, Segments visibility) {
        return (Collection)this.connector.retryWithHandle(handle -> {
            if (visibility == Segments.ONLY_VISIBLE) {
                SegmentTimeline timeline = this.getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
                return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
            }
            return this.retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
        });
    }

    public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals) {
        StringBuilder queryBuilder = new StringBuilder("SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true");
        boolean compareIntervalEndpointsAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
        SqlSegmentsMetadataQuery.IntervalMode intervalMode = SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS;
        SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(queryBuilder, compareIntervalEndpointsAsString ? intervals : Collections.emptyList(), intervalMode, this.connector);
        String queryString = StringUtils.format((String)queryBuilder.toString(), (Object[])new Object[]{this.dbTables.getSegmentsTable()});
        return (List)this.connector.retryWithHandle(handle -> {
            Query query = (Query)handle.createQuery(queryString).bind("dataSource", dataSource);
            if (compareIntervalEndpointsAsString) {
                SqlSegmentsMetadataQuery.bindQueryIntervals((Query<Map<String, Object>>)query, intervals);
            }
            List segmentsWithCreatedDates = query.map((index, r, ctx) -> new Pair((Object)((DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getBytes("payload"), DataSegment.class)), (Object)r.getString("created_date"))).list();
            if (intervals.isEmpty() || compareIntervalEndpointsAsString) {
                return segmentsWithCreatedDates;
            }
            return segmentsWithCreatedDates.stream().filter(pair -> {
                for (Interval interval : intervals) {
                    if (!intervalMode.apply(interval, ((DataSegment)pair.lhs).getInterval())) continue;
                    return true;
                }
                return false;
            }).collect(Collectors.toList());
        });
    }

    @Override
    public List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, Interval interval, @Nullable Integer limit, @Nullable DateTime maxUsedStatusLastUpdatedTime) {
        List matchingSegments = (List)this.connector.inReadOnlyTransaction((handle, status) -> {
            try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null, maxUsedStatusLastUpdatedTime);){
                ImmutableList immutableList = ImmutableList.copyOf(iterator);
                return immutableList;
            }
        });
        log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] with maxUsedStatusLastUpdatedTime[%s].", new Object[]{matchingSegments.size(), dataSource, interval, maxUsedStatusLastUpdatedTime});
        return matchingSegments;
    }

    @Override
    public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) {
        Integer numSegmentsMarkedUnused = (Integer)this.connector.retryTransaction((handle, status) -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).markSegmentsUnused(dataSource, interval), 3, 10);
        log.info("Marked %,d segments unused for %s for interval %s.", new Object[]{numSegmentsMarkedUnused, dataSource, interval});
        return numSegmentsMarkedUnused;
    }

    @VisibleForTesting
    Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(Handle handle, String dataSource, Interval interval, Set<String> sequenceNamePrefixFilter) throws IOException {
        if (sequenceNamePrefixFilter.isEmpty()) {
            return Collections.emptyMap();
        }
        ArrayList<String> sequenceNamePrefixes = new ArrayList<String>(sequenceNamePrefixFilter);
        ArrayList<String> sequenceNamePrefixConditions = new ArrayList<String>();
        for (int i = 0; i < sequenceNamePrefixes.size(); ++i) {
            sequenceNamePrefixConditions.add(StringUtils.format((String)"(sequence_name LIKE :prefix%d)", (Object[])new Object[]{i}));
        }
        String sql = "SELECT sequence_name, payload FROM " + this.dbTables.getPendingSegmentsTable() + " WHERE dataSource = :dataSource AND start < :end" + StringUtils.format((String)" AND %1$send%1$s > :start", (Object[])new Object[]{this.connector.getQuoteString()}) + " AND ( " + String.join((CharSequence)" OR ", sequenceNamePrefixConditions) + " )";
        Query query = (Query)((Query)((Query)handle.createQuery(sql).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString());
        for (int i = 0; i < sequenceNamePrefixes.size(); ++i) {
            query.bind(StringUtils.format((String)"prefix%d", (Object[])new Object[]{i}), (String)sequenceNamePrefixes.get(i) + "%");
        }
        ResultIterator dbSegments = query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)).iterator();
        HashMap<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<SegmentIdWithShardSpec, String>();
        while (dbSegments.hasNext()) {
            PendingSegmentsRecord record = (PendingSegmentsRecord)dbSegments.next();
            SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
            if (!interval.overlaps((ReadableInterval)identifier.getInterval())) continue;
            pendingSegmentToSequenceName.put(identifier, record.sequenceName);
        }
        dbSegments.close();
        return pendingSegmentToSequenceName;
    }

    private Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(Handle handle, String dataSource, Interval interval) throws IOException {
        ResultIterator dbSegments = ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT sequence_name, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)).iterator();
        HashMap<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<SegmentIdWithShardSpec, String>();
        while (dbSegments.hasNext()) {
            PendingSegmentsRecord record = (PendingSegmentsRecord)dbSegments.next();
            SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
            if (!interval.overlaps((ReadableInterval)identifier.getInterval())) continue;
            pendingSegmentToSequenceName.put(identifier, record.sequenceName);
        }
        dbSegments.close();
        return pendingSegmentToSequenceName;
    }

    private SegmentTimeline getTimelineForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) throws IOException {
        try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUsedSegments(dataSource, intervals);){
            SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(iterator);
            return segmentTimeline;
        }
    }

    private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) throws IOException {
        try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUsedSegments(dataSource, intervals);){
            ArrayList<DataSegment> retVal = new ArrayList<DataSegment>();
            iterator.forEachRemaining(retVal::add);
            ArrayList<DataSegment> arrayList = retVal;
            return arrayList;
        }
    }

    @Override
    public Set<DataSegment> commitSegments(Set<DataSegment> segments) throws IOException {
        SegmentPublishResult result = this.commitSegmentsAndMetadata(segments, null, null);
        if (!result.isSuccess()) {
            throw new ISE("announceHistoricalSegments failed with null metadata, should not happen.", new Object[0]);
        }
        return result.getSegments();
    }

    @Override
    public SegmentPublishResult commitSegmentsAndMetadata(final Set<DataSegment> segments, final @Nullable DataSourceMetadata startMetadata, final @Nullable DataSourceMetadata endMetadata) throws IOException {
        if (segments.isEmpty()) {
            throw new IllegalArgumentException("segment set must not be empty");
        }
        final String dataSource = segments.iterator().next().getDataSource();
        for (DataSegment segment : segments) {
            if (dataSource.equals(segment.getDataSource())) continue;
            throw new IllegalArgumentException("segments must all be from the same dataSource");
        }
        if (startMetadata == null && endMetadata != null || startMetadata != null && endMetadata == null) {
            throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
        }
        final HashSet<DataSegment> usedSegments = new HashSet<DataSegment>();
        List segmentHolders = SegmentTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY);
        for (TimelineObjectHolder holder : segmentHolders) {
            for (PartitionChunk chunk : holder.getObject()) {
                usedSegments.add((DataSegment)chunk.getObject());
            }
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    DataStoreMetadataUpdateResult result;
                    definitelyNotUpdated.set(false);
                    if (startMetadata != null && (result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata)).isFailed()) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result.canRetry()) {
                            throw new RetryTransactionException(result.getErrorMsg());
                        }
                        throw new RuntimeException(result.getErrorMsg());
                    }
                    Set inserted = IndexerSQLMetadataStorageCoordinator.this.announceHistoricalSegmentBatch(handle, segments, usedSegments);
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.copyOf((Collection)inserted));
                }
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @Override
    public SegmentPublishResult commitReplaceSegments(Set<DataSegment> replaceSegments, Set<ReplaceTaskLock> locksHeldByReplaceTask) {
        this.verifySegmentsToCommit(replaceSegments);
        try {
            return (SegmentPublishResult)this.connector.retryTransaction((handle, transactionStatus) -> {
                HashSet<DataSegment> segmentsToInsert = new HashSet<DataSegment>(replaceSegments);
                segmentsToInsert.addAll(this.createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask));
                return SegmentPublishResult.ok(this.insertSegments(handle, segmentsToInsert));
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            return SegmentPublishResult.fail(e.getMessage());
        }
    }

    @Override
    public SegmentPublishResult commitAppendSegments(Set<DataSegment> appendSegments, Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock) {
        return this.commitAppendSegmentsAndMetadataInTransaction(appendSegments, appendSegmentToReplaceLock, null, null);
    }

    @Override
    public SegmentPublishResult commitAppendSegmentsAndMetadata(Set<DataSegment> appendSegments, Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) {
        return this.commitAppendSegmentsAndMetadataInTransaction(appendSegments, appendSegmentToReplaceLock, startMetadata, endMetadata);
    }

    @Override
    public SegmentPublishResult commitMetadataOnly(final String dataSource, final DataSourceMetadata startMetadata, final DataSourceMetadata endMetadata) {
        if (dataSource == null) {
            throw new IllegalArgumentException("datasource name cannot be null");
        }
        if (startMetadata == null) {
            throw new IllegalArgumentException("start metadata cannot be null");
        }
        if (endMetadata == null) {
            throw new IllegalArgumentException("end metadata cannot be null");
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    definitelyNotUpdated.set(false);
                    DataStoreMetadataUpdateResult result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
                    if (result.isFailed()) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result.canRetry()) {
                            throw new RetryTransactionException(result.getErrorMsg());
                        }
                        throw new RuntimeException(result.getErrorMsg());
                    }
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.of());
                }
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @VisibleForTesting
    public int getSqlMetadataMaxRetry() {
        return 10;
    }

    @Override
    public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(String dataSource, Interval allocateInterval, boolean skipSegmentLineageCheck, List<SegmentCreateRequest> requests) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)allocateInterval, (Object)"interval");
        Interval interval = allocateInterval.withChronology((Chronology)ISOChronology.getInstanceUTC());
        return (Map)this.connector.retryWithHandle(handle -> this.allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests));
    }

    @Override
    public SegmentIdWithShardSpec allocatePendingSegment(String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, PartialShardSpec partialShardSpec, String maxVersion, boolean skipSegmentLineageCheck) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)sequenceName, (Object)"sequenceName");
        Preconditions.checkNotNull((Object)interval, (Object)"interval");
        Preconditions.checkNotNull((Object)maxVersion, (Object)"version");
        Interval allocateInterval = interval.withChronology((Chronology)ISOChronology.getInstanceUTC());
        return (SegmentIdWithShardSpec)this.connector.retryWithHandle(handle -> {
            List existingChunks = this.getTimelineForIntervalsWithHandle(handle, dataSource, (List<Interval>)ImmutableList.of((Object)interval)).lookup(interval);
            if (existingChunks.size() > 1) {
                log.warn("Cannot allocate new segment for dataSource[%s], interval[%s] as it already has [%,d] versions.", new Object[]{dataSource, interval, existingChunks.size()});
                return null;
            }
            if (skipSegmentLineageCheck) {
                return this.allocatePendingSegment(handle, dataSource, sequenceName, allocateInterval, partialShardSpec, maxVersion, existingChunks);
            }
            return this.allocatePendingSegmentWithSegmentLineageCheck(handle, dataSource, sequenceName, previousSegmentId, allocateInterval, partialShardSpec, maxVersion, existingChunks);
        });
    }

    @Override
    public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments, Set<String> activeRealtimeSequencePrefixes) {
        if (replaceSegments.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<Interval, DataSegment>();
        for (DataSegment segment : replaceSegments) {
            DataSegment committedMaxId = (DataSegment)replaceIntervalToMaxId.get(segment.getInterval());
            if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= segment.getShardSpec().getPartitionNum()) continue;
            replaceIntervalToMaxId.put(segment.getInterval(), segment);
        }
        String datasource = replaceSegments.iterator().next().getDataSource();
        return (Map)this.connector.retryWithHandle(handle -> this.upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes));
    }

    private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(Handle handle, String datasource, Map<Interval, DataSegment> replaceIntervalToMaxId, Set<String> activeRealtimeSequencePrefixes) throws IOException {
        HashMap<SegmentCreateRequest, SegmentIdWithShardSpec> newPendingSegmentVersions = new HashMap<SegmentCreateRequest, SegmentIdWithShardSpec>();
        HashMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec> pendingSegmentToNewId = new HashMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec>();
        for (Map.Entry<Interval, DataSegment> entry : replaceIntervalToMaxId.entrySet()) {
            Interval replaceInterval = entry.getKey();
            DataSegment maxSegmentId = entry.getValue();
            String replaceVersion = maxSegmentId.getVersion();
            int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions();
            int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum();
            Map<SegmentIdWithShardSpec, String> overlappingPendingSegments = this.getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes);
            for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment : overlappingPendingSegments.entrySet()) {
                String pendingSegmentSequence;
                SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
                if (!this.shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence = overlappingPendingSegment.getValue(), replaceInterval, replaceVersion)) continue;
                SegmentIdWithShardSpec newId = new SegmentIdWithShardSpec(datasource, replaceInterval, replaceVersion, (ShardSpec)new NumberedShardSpec(++currentPartitionNumber, numCorePartitions));
                newPendingSegmentVersions.put(new SegmentCreateRequest(UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion, pendingSegmentId.toString(), replaceVersion, (PartialShardSpec)NumberedPartialShardSpec.instance()), newId);
                pendingSegmentToNewId.put(pendingSegmentId, newId);
            }
        }
        int numInsertedPendingSegments = this.insertPendingSegmentsIntoMetastore(handle, newPendingSegmentVersions, datasource, false);
        log.info("Inserted total [%d] new versions for [%d] pending segments.", new Object[]{numInsertedPendingSegments, newPendingSegmentVersions.size()});
        return pendingSegmentToNewId;
    }

    private boolean shouldUpgradePendingSegment(SegmentIdWithShardSpec pendingSegmentId, String pendingSegmentSequenceName, Interval replaceInterval, String replaceVersion) {
        if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
            return false;
        }
        if (!replaceInterval.contains((ReadableInterval)pendingSegmentId.getInterval())) {
            return false;
        }
        return pendingSegmentSequenceName == null || !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(Handle handle, String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, PartialShardSpec partialShardSpec, String maxVersion, List<TimelineObjectHolder<String, DataSegment>> existingChunks) throws IOException {
        String usedSegmentVersion;
        String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
        String sql = StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()});
        Query query = (Query)((Query)((Query)handle.createQuery(sql).bind("dataSource", dataSource)).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentIdNotNull);
        CheckExistingSegmentIdResult result = this.findExistingPendingSegment((Query<Map<String, Object>>)query, interval, sequenceName, previousSegmentIdNotNull, usedSegmentVersion = existingChunks.isEmpty() ? null : (String)existingChunks.get(0).getVersion());
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, partialShardSpec, maxVersion, existingChunks);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putBytes(StringUtils.toUtf8((String)previousSegmentIdNotNull)).putByte((byte)-1).putBytes(StringUtils.toUtf8((String)newIdentifier.getVersion())).hash().asBytes());
        this.insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, previousSegmentIdNotNull, sequenceName, sequenceNamePrevIdSha1);
        return newIdentifier;
    }

    private Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(Handle handle, String dataSource, Interval interval, boolean skipSegmentLineageCheck, List<SegmentCreateRequest> requests) throws IOException {
        List existingChunks = this.getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)).lookup(interval);
        if (existingChunks.size() > 1) {
            log.warn("Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", new Object[]{dataSource, interval, existingChunks.size()});
            return Collections.emptyMap();
        }
        String existingVersion = existingChunks.isEmpty() ? null : (String)((TimelineObjectHolder)existingChunks.get(0)).getVersion();
        Map<SegmentCreateRequest, CheckExistingSegmentIdResult> existingSegmentIds = skipSegmentLineageCheck ? this.getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, existingVersion, requests) : this.getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, existingVersion, requests);
        HashMap<SegmentCreateRequest, SegmentIdWithShardSpec> allocatedSegmentIds = new HashMap<SegmentCreateRequest, SegmentIdWithShardSpec>();
        ArrayList<SegmentCreateRequest> requestsForNewSegments = new ArrayList<SegmentCreateRequest>();
        for (SegmentCreateRequest request : requests) {
            CheckExistingSegmentIdResult existingSegmentId = existingSegmentIds.get(request);
            if (existingSegmentId == null || !existingSegmentId.found) {
                requestsForNewSegments.add(request);
                continue;
            }
            if (existingSegmentId.segmentIdentifier != null) {
                log.info("Found valid existing segment [%s] for request.", new Object[]{existingSegmentId.segmentIdentifier});
                allocatedSegmentIds.put(request, existingSegmentId.segmentIdentifier);
                continue;
            }
            log.info("Found clashing existing segment [%s] for request.", new Object[]{existingSegmentId});
        }
        Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = this.createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, existingChunks, requestsForNewSegments);
        this.insertPendingSegmentsIntoMetastore(handle, createdSegments, dataSource, skipSegmentLineageCheck);
        allocatedSegmentIds.putAll(createdSegments);
        return allocatedSegmentIds;
    }

    private String getSequenceNameAndPrevIdSha(SegmentCreateRequest request, SegmentIdWithShardSpec pendingSegmentId, boolean skipSegmentLineageCheck) {
        Hasher hasher = Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)request.getSequenceName())).putByte((byte)-1);
        if (skipSegmentLineageCheck) {
            Interval interval = pendingSegmentId.getInterval();
            hasher.putLong(interval.getStartMillis()).putLong(interval.getEndMillis());
        } else {
            hasher.putBytes(StringUtils.toUtf8((String)request.getPreviousSegmentId()));
        }
        hasher.putByte((byte)-1);
        hasher.putBytes(StringUtils.toUtf8((String)pendingSegmentId.getVersion()));
        return BaseEncoding.base16().encode(hasher.hash().asBytes());
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegment(Handle handle, String dataSource, String sequenceName, Interval interval, PartialShardSpec partialShardSpec, String maxVersion, List<TimelineObjectHolder<String, DataSegment>> existingChunks) throws IOException {
        String sql = StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND start = :start AND %2$send%2$s = :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()});
        Query query = (Query)((Query)((Query)((Query)handle.createQuery(sql).bind("dataSource", dataSource)).bind("sequence_name", sequenceName)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString());
        CheckExistingSegmentIdResult result = this.findExistingPendingSegment((Query<Map<String, Object>>)query, interval, sequenceName, null, existingChunks.isEmpty() ? null : (String)existingChunks.get(0).getVersion());
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, partialShardSpec, maxVersion, existingChunks);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putLong(interval.getStartMillis()).putLong(interval.getEndMillis()).putByte((byte)-1).putBytes(StringUtils.toUtf8((String)newIdentifier.getVersion())).hash().asBytes());
        this.insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
        log.info("Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].", new Object[]{newIdentifier, dataSource, sequenceName, interval});
        return newIdentifier;
    }

    private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsSkipLineageCheck(Handle handle, String dataSource, Interval interval, String usedSegmentVersion, List<SegmentCreateRequest> requests) throws IOException {
        Query query = (Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT sequence_name, payload FROM %s WHERE dataSource = :dataSource AND start = :start AND %2$send%2$s = :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString());
        ResultIterator dbSegments = query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)).iterator();
        HashMap<String, SegmentIdWithShardSpec> sequenceToSegmentId = new HashMap<String, SegmentIdWithShardSpec>();
        while (dbSegments.hasNext()) {
            PendingSegmentsRecord record = (PendingSegmentsRecord)dbSegments.next();
            SegmentIdWithShardSpec segmentId = (SegmentIdWithShardSpec)this.jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class);
            if (usedSegmentVersion != null && !segmentId.getVersion().equals(usedSegmentVersion)) continue;
            sequenceToSegmentId.put(record.getSequenceName(), segmentId);
        }
        HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult>();
        Iterator<SegmentCreateRequest> iterator = requests.iterator();
        while (iterator.hasNext()) {
            SegmentCreateRequest request;
            SegmentIdWithShardSpec segmentId = (SegmentIdWithShardSpec)sequenceToSegmentId.get((request = iterator.next()).getSequenceName());
            requestToResult.put(request, new CheckExistingSegmentIdResult(segmentId != null, segmentId));
        }
        return requestToResult;
    }

    private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsWithLineageCheck(Handle handle, String dataSource, Interval interval, String usedSegmentVersion, List<SegmentCreateRequest> requests) throws IOException {
        String sql = StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()});
        HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult>();
        for (SegmentCreateRequest request : requests) {
            CheckExistingSegmentIdResult result = this.findExistingPendingSegment((Query<Map<String, Object>>)((Query)((Query)((Query)handle.createQuery(sql).bind("dataSource", dataSource)).bind("sequence_name", request.getSequenceName())).bind("sequence_prev_id", request.getPreviousSegmentId())), interval, request.getSequenceName(), request.getPreviousSegmentId(), usedSegmentVersion);
            requestToResult.put(request, result);
        }
        return requestToResult;
    }

    private CheckExistingSegmentIdResult findExistingPendingSegment(Query<Map<String, Object>> query, Interval interval, String sequenceName, @Nullable String previousSegmentId, @Nullable String usedSegmentVersion) throws IOException {
        List records = query.map((ResultSetMapper)ByteArrayMapper.FIRST).list();
        if (records.isEmpty()) {
            return new CheckExistingSegmentIdResult(false, null);
        }
        for (byte[] record : records) {
            SegmentIdWithShardSpec pendingSegment = (SegmentIdWithShardSpec)this.jsonMapper.readValue(record, SegmentIdWithShardSpec.class);
            if (usedSegmentVersion != null && !pendingSegment.getVersion().equals(usedSegmentVersion)) continue;
            if (pendingSegment.getInterval().isEqual((ReadableInterval)interval)) {
                log.info("Found existing pending segment[%s] for sequence[%s], previous segment[%s], version[%s] in DB", new Object[]{pendingSegment, sequenceName, previousSegmentId, usedSegmentVersion});
                return new CheckExistingSegmentIdResult(true, pendingSegment);
            }
            log.warn("Cannot use existing pending segment [%s] for sequence[%s], previous segment[%s] in DB as it does not match requested interval[%s], version[%s].", new Object[]{pendingSegment, sequenceName, previousSegmentId, interval, usedSegmentVersion});
            return new CheckExistingSegmentIdResult(true, null);
        }
        return new CheckExistingSegmentIdResult(false, null);
    }

    private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(Set<DataSegment> appendSegments, Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock, @Nullable DataSourceMetadata startMetadata, @Nullable DataSourceMetadata endMetadata) {
        this.verifySegmentsToCommit(appendSegments);
        if (startMetadata == null && endMetadata != null || startMetadata != null && endMetadata == null) {
            throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
        }
        String dataSource = appendSegments.iterator().next().getDataSource();
        Set segmentIdsForNewVersions = (Set)this.connector.retryTransaction((handle, transactionStatus) -> this.createNewIdsForAppendSegments(handle, dataSource, appendSegments), 0, 10);
        HashSet<DataSegment> allSegmentsToInsert = new HashSet<DataSegment>(appendSegments);
        allSegmentsToInsert.addAll(segmentIdsForNewVersions);
        AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
        try {
            return (SegmentPublishResult)this.connector.retryTransaction((handle, transactionStatus) -> {
                DataStoreMetadataUpdateResult metadataUpdateResult;
                metadataNotUpdated.set(false);
                if (startMetadata != null && (metadataUpdateResult = this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata)).isFailed()) {
                    transactionStatus.setRollbackOnly();
                    metadataNotUpdated.set(true);
                    if (metadataUpdateResult.canRetry()) {
                        throw new RetryTransactionException(metadataUpdateResult.getErrorMsg());
                    }
                    throw new RuntimeException(metadataUpdateResult.getErrorMsg());
                }
                this.insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
                return SegmentPublishResult.ok(this.insertSegments(handle, allSegmentsToInsert));
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            if (metadataNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    private int insertPendingSegmentsIntoMetastore(Handle handle, Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments, String dataSource, boolean skipSegmentLineageCheck) throws JsonProcessingException {
        PreparedBatch insertBatch = handle.prepareBatch(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()}));
        HashMap segmentIdToRequest = new HashMap();
        createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
        for (Map.Entry entry : segmentIdToRequest.entrySet()) {
            SegmentCreateRequest request2 = (SegmentCreateRequest)entry.getValue();
            SegmentIdWithShardSpec segmentId2 = (SegmentIdWithShardSpec)entry.getKey();
            Interval interval = segmentId2.getInterval();
            ((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)insertBatch.add().bind("id", segmentId2.toString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", request2.getSequenceName())).bind("sequence_prev_id", request2.getPreviousSegmentId())).bind("sequence_name_prev_id_sha1", this.getSequenceNameAndPrevIdSha(request2, segmentId2, skipSegmentLineageCheck))).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segmentId2));
        }
        int[] updated = insertBatch.execute();
        return Arrays.stream(updated).sum();
    }

    private void insertPendingSegmentIntoMetastore(Handle handle, SegmentIdWithShardSpec newIdentifier, String dataSource, Interval interval, String previousSegmentId, String sequenceName, String sequenceNamePrevIdSha1) throws JsonProcessingException {
        ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("id", newIdentifier.toString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentId)).bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)).bind("payload", this.jsonMapper.writeValueAsBytes((Object)newIdentifier))).execute();
    }

    private Set<DataSegment> createNewIdsForAppendSegments(Handle handle, String dataSource, Set<DataSegment> segmentsToAppend) throws IOException {
        if (segmentsToAppend.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<Interval> appendIntervals = new HashSet<Interval>();
        TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<String, Set<DataSegment>>();
        for (DataSegment segment : segmentsToAppend) {
            appendIntervals.add(segment.getInterval());
            appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet()).add(segment);
        }
        Collection<DataSegment> overlappingSegments = this.retrieveUsedSegmentsForIntervals(dataSource, new ArrayList<Interval>(appendIntervals), Segments.INCLUDING_OVERSHADOWED);
        HashMap<String, Set> overlappingVersionToIntervals = new HashMap<String, Set>();
        HashMap overlappingIntervalToSegments = new HashMap();
        for (DataSegment segment : overlappingSegments) {
            overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet()).add(segment.getInterval());
            overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet()).add(segment);
        }
        HashSet<DataSegment> upgradedSegments = new HashSet<DataSegment>();
        for (Map.Entry entry : overlappingVersionToIntervals.entrySet()) {
            String upgradeVersion = (String)entry.getKey();
            Map<Interval, Set<DataSegment>> segmentsToUpgrade = this.getSegmentsWithVersionLowerThan(upgradeVersion, (Set)entry.getValue(), appendVersionToSegments);
            for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
                Interval upgradeInterval = upgradeEntry.getKey();
                Set<DataSegment> segmentsAlreadyOnVersion = overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet()).stream().filter(s -> s.getVersion().equals(upgradeVersion)).collect(Collectors.toSet());
                Set<DataSegment> segmentsUpgradedToVersion = this.createNewIdsForAppendSegmentsWithVersion(handle, upgradeVersion, upgradeInterval, upgradeEntry.getValue(), segmentsAlreadyOnVersion);
                log.info("Upgraded [%d] segments to version[%s].", new Object[]{segmentsUpgradedToVersion.size(), upgradeVersion});
                upgradedSegments.addAll(segmentsUpgradedToVersion);
            }
        }
        return upgradedSegments;
    }

    private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(String cutoffVersion, Set<Interval> eligibleIntervals, TreeMap<String, Set<DataSegment>> versionToSegments) {
        Set eligibleSegments = versionToSegments.headMap(cutoffVersion).values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
        HashMap<Interval, Set<DataSegment>> eligibleIntervalToSegments = new HashMap<Interval, Set<DataSegment>>();
        block0: for (DataSegment segment : eligibleSegments) {
            Interval segmentInterval = segment.getInterval();
            for (Interval eligibleInterval : eligibleIntervals) {
                if (eligibleInterval.contains((ReadableInterval)segmentInterval)) {
                    eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet()).add(segment);
                    continue block0;
                }
                if (!eligibleInterval.overlaps((ReadableInterval)segmentInterval)) continue;
                throw new ISE("Committed interval[%s] conflicts with interval[%s] of append segment[%s].", new Object[]{eligibleInterval, segmentInterval, segment.getId()});
            }
        }
        return eligibleIntervalToSegments;
    }

    private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(Handle handle, String upgradeVersion, Interval upgradeInterval, Set<DataSegment> segmentsToUpgrade, Set<DataSegment> committedSegments) throws IOException {
        SegmentIdWithShardSpec committedMaxId = null;
        for (DataSegment committedSegment : committedSegments) {
            if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= committedSegment.getShardSpec().getPartitionNum()) continue;
            committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment);
        }
        String dataSource = segmentsToUpgrade.iterator().next().getDataSource();
        Set<SegmentIdWithShardSpec> pendingSegmentIds = this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, upgradeInterval).keySet();
        HashSet<SegmentIdWithShardSpec> allAllocatedIds = new HashSet<SegmentIdWithShardSpec>(pendingSegmentIds);
        HashSet<DataSegment> newSegmentIds = new HashSet<DataSegment>();
        for (DataSegment segment : segmentsToUpgrade) {
            SegmentCreateRequest request = new SegmentCreateRequest(segment.getId() + "__" + upgradeVersion, null, upgradeVersion, (PartialShardSpec)NumberedPartialShardSpec.instance());
            SegmentIdWithShardSpec newId = this.createNewSegment(request, dataSource, upgradeInterval, upgradeVersion, committedMaxId, allAllocatedIds);
            allAllocatedIds.add(newId);
            newSegmentIds.add(DataSegment.builder((DataSegment)segment).interval(newId.getInterval()).version(newId.getVersion()).shardSpec(newId.getShardSpec()).build());
        }
        return newSegmentIds;
    }

    private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(Handle handle, String dataSource, Interval interval, boolean skipSegmentLineageCheck, List<TimelineObjectHolder<String, DataSegment>> existingChunks, List<SegmentCreateRequest> requests) throws IOException {
        String versionOfExistingChunk;
        if (requests.isEmpty()) {
            return Collections.emptyMap();
        }
        PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec();
        SegmentIdWithShardSpec committedMaxId = null;
        if (existingChunks.isEmpty()) {
            versionOfExistingChunk = null;
        } else {
            TimelineObjectHolder existingHolder = (TimelineObjectHolder)Iterables.getOnlyElement(existingChunks);
            versionOfExistingChunk = (String)existingHolder.getVersion();
            for (DataSegment segment2 : FluentIterable.from((Iterable)existingHolder.getObject()).transform(PartitionChunk::getObject).filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
                if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= segment2.getShardSpec().getPartitionNum()) continue;
                committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment2);
            }
        }
        HashSet<SegmentIdWithShardSpec> pendingSegments = new HashSet<SegmentIdWithShardSpec>(this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
        HashMap<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<SegmentCreateRequest, SegmentIdWithShardSpec>();
        HashMap<UniqueAllocateRequest, SegmentIdWithShardSpec> uniqueRequestToSegment = new HashMap<UniqueAllocateRequest, SegmentIdWithShardSpec>();
        for (SegmentCreateRequest request : requests) {
            SegmentIdWithShardSpec createdSegment;
            UniqueAllocateRequest uniqueRequest = new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck);
            if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
                createdSegment = (SegmentIdWithShardSpec)uniqueRequestToSegment.get(uniqueRequest);
            } else {
                createdSegment = this.createNewSegment(request, dataSource, interval, versionOfExistingChunk, committedMaxId, pendingSegments);
                if (createdSegment != null) {
                    pendingSegments.add(createdSegment);
                    uniqueRequestToSegment.put(uniqueRequest, createdSegment);
                    log.info("Created new segment[%s]", new Object[]{createdSegment});
                }
            }
            if (createdSegment == null) continue;
            createdSegments.put(request, createdSegment);
        }
        log.info("Created [%d] new segments for [%d] allocate requests.", new Object[]{uniqueRequestToSegment.size(), requests.size()});
        return createdSegments;
    }

    private SegmentIdWithShardSpec createNewSegment(SegmentCreateRequest request, String dataSource, Interval interval, String versionOfExistingChunk, SegmentIdWithShardSpec committedMaxId, Set<SegmentIdWithShardSpec> pendingSegments) {
        PartialShardSpec partialShardSpec = request.getPartialShardSpec();
        String existingVersion = request.getVersion();
        HashSet<SegmentIdWithShardSpec> mutablePendingSegments = new HashSet<SegmentIdWithShardSpec>(pendingSegments);
        if (committedMaxId != null) {
            mutablePendingSegments.add(committedMaxId);
        }
        SegmentIdWithShardSpec overallMaxId = mutablePendingSegments.stream().filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)).filter(id -> versionOfExistingChunk == null || id.getVersion().equals(versionOfExistingChunk)).max(Comparator.comparing(SegmentIdWithShardSpec::getVersion).thenComparing(id -> id.getShardSpec().getPartitionNum())).orElse(null);
        String newSegmentVersion = versionOfExistingChunk != null ? versionOfExistingChunk : (overallMaxId != null ? overallMaxId.getVersion() : null);
        if (overallMaxId == null) {
            int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? 32768 : 0;
            String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
            return new SegmentIdWithShardSpec(dataSource, interval, version, partialShardSpec.complete(this.jsonMapper, newPartitionId, 0));
        }
        if (!overallMaxId.getInterval().equals((Object)interval)) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", new Object[]{dataSource, interval, existingVersion, overallMaxId});
            return null;
        }
        if (committedMaxId != null && committedMaxId.getShardSpec().getNumCorePartitions() == -1) {
            log.warn("Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", new Object[]{committedMaxId, committedMaxId.getShardSpec()});
            return null;
        }
        return new SegmentIdWithShardSpec(dataSource, interval, (String)Preconditions.checkNotNull((Object)newSegmentVersion, (Object)"newSegmentVersion"), partialShardSpec.complete(this.jsonMapper, overallMaxId.getShardSpec().getPartitionNum() + 1, committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()));
    }

    @Nullable
    private SegmentIdWithShardSpec createNewSegment(Handle handle, String dataSource, Interval interval, PartialShardSpec partialShardSpec, String existingVersion, List<TimelineObjectHolder<String, DataSegment>> existingChunks) throws IOException {
        String versionOfExistingChunk;
        SegmentIdWithShardSpec committedMaxId = null;
        if (existingChunks.isEmpty()) {
            versionOfExistingChunk = null;
        } else {
            TimelineObjectHolder existingHolder = (TimelineObjectHolder)Iterables.getOnlyElement(existingChunks);
            versionOfExistingChunk = (String)existingHolder.getVersion();
            for (DataSegment segment2 : FluentIterable.from((Iterable)existingHolder.getObject()).transform(PartitionChunk::getObject).filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
                if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= segment2.getShardSpec().getPartitionNum()) continue;
                committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment2);
            }
        }
        HashSet<SegmentIdWithShardSpec> pendings = new HashSet<SegmentIdWithShardSpec>(this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
        if (committedMaxId != null) {
            pendings.add(committedMaxId);
        }
        SegmentIdWithShardSpec overallMaxId = pendings.stream().filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)).filter(id -> versionOfExistingChunk == null || id.getVersion().equals(versionOfExistingChunk)).max(Comparator.comparing(SegmentIdWithShardSpec::getVersion).thenComparing(id -> id.getShardSpec().getPartitionNum())).orElse(null);
        String newSegmentVersion = versionOfExistingChunk != null ? versionOfExistingChunk : (overallMaxId != null ? overallMaxId.getVersion() : null);
        if (overallMaxId == null) {
            int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? 32768 : 0;
            String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
            return new SegmentIdWithShardSpec(dataSource, interval, version, partialShardSpec.complete(this.jsonMapper, newPartitionId, 0));
        }
        if (!overallMaxId.getInterval().equals((Object)interval)) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", new Object[]{dataSource, interval, existingVersion, overallMaxId});
            return null;
        }
        if (committedMaxId != null && committedMaxId.getShardSpec().getNumCorePartitions() == -1) {
            log.warn("Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", new Object[]{committedMaxId, committedMaxId.getShardSpec()});
            return null;
        }
        return new SegmentIdWithShardSpec(dataSource, interval, (String)Preconditions.checkNotNull((Object)newSegmentVersion, (Object)"newSegmentVersion"), partialShardSpec.complete(this.jsonMapper, overallMaxId.getShardSpec().getPartitionNum() + 1, committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()));
    }

    @Override
    public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"DELETE FROM %s WHERE datasource = :dataSource AND created_date >= :start AND created_date < :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})).bind("dataSource", dataSource)).bind("start", deleteInterval.getStart().toString())).bind("end", deleteInterval.getEnd().toString())).execute());
    }

    @Override
    public int deletePendingSegments(String dataSource) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)handle.createStatement(StringUtils.format((String)"DELETE FROM %s WHERE datasource = :dataSource", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})).bind("dataSource", dataSource)).execute());
    }

    private Set<DataSegment> announceHistoricalSegmentBatch(Handle handle, Set<DataSegment> segments, Set<DataSegment> usedSegments) throws IOException {
        HashSet<DataSegment> toInsertSegments = new HashSet<DataSegment>();
        try {
            Set<String> existedSegments = this.segmentExistsBatch(handle, segments);
            log.info("Found these segments already exist in DB: %s", new Object[]{existedSegments});
            for (DataSegment segment : segments) {
                if (existedSegments.contains(segment.getId().toString())) continue;
                toInsertSegments.add(segment);
            }
            List partitionedSegments = Lists.partition(new ArrayList(toInsertSegments), (int)100);
            PreparedBatch preparedBatch = handle.prepareBatch(this.buildSqlToInsertSegments());
            for (List partition : partitionedSegments) {
                for (DataSegment segment : partition) {
                    String now = DateTimes.nowUtc().toString();
                    ((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)preparedBatch.add().bind("id", segment.getId().toString())).bind("dataSource", segment.getDataSource())).bind("created_date", now)).bind("start", segment.getInterval().getStart().toString())).bind("end", segment.getInterval().getEnd().toString())).bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))).bind("version", segment.getVersion())).bind("used", usedSegments.contains(segment))).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).bind("used_status_last_updated", now);
                }
                int[] affectedRows = preparedBatch.execute();
                boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
                if (succeeded) {
                    log.infoSegments((Collection)partition, "Published segments to DB");
                    continue;
                }
                List failedToPublish = IntStream.range(0, partition.size()).filter(i -> affectedRows[i] != 1).mapToObj(partition::get).collect(Collectors.toList());
                throw new ISE("Failed to publish segments to DB: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(failedToPublish)});
            }
        }
        catch (Exception e) {
            log.errorSegments(segments, "Exception inserting segments");
            throw e;
        }
        return toInsertSegments;
    }

    private Set<DataSegment> createNewIdsOfAppendSegmentsAfterReplace(Handle handle, Set<DataSegment> replaceSegments, Set<ReplaceTaskLock> locksHeldByReplaceTask) {
        if (replaceSegments.isEmpty() || locksHeldByReplaceTask.isEmpty()) {
            return Collections.emptySet();
        }
        HashMap<Interval, Integer> intervalToNumCorePartitions = new HashMap<Interval, Integer>();
        HashMap<Interval, Integer> intervalToCurrentPartitionNum = new HashMap<Interval, Integer>();
        for (DataSegment segment : replaceSegments) {
            intervalToNumCorePartitions.put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions());
            int partitionNum = segment.getShardSpec().getPartitionNum();
            intervalToCurrentPartitionNum.compute(segment.getInterval(), (i, value) -> value == null ? partitionNum : Math.max(value, partitionNum));
        }
        String taskId = locksHeldByReplaceTask.stream().map(ReplaceTaskLock::getSupervisorTaskId).findFirst().orElse(null);
        Map<String, String> upgradeSegmentToLockVersion = this.getAppendSegmentsCommittedDuringTask(handle, taskId);
        List<DataSegment> segmentsToUpgrade = this.retrieveSegmentsById(handle, upgradeSegmentToLockVersion.keySet());
        if (segmentsToUpgrade.isEmpty()) {
            return Collections.emptySet();
        }
        Set replaceIntervals = intervalToNumCorePartitions.keySet();
        HashSet<DataSegment> upgradedSegments = new HashSet<DataSegment>();
        for (DataSegment oldSegment : segmentsToUpgrade) {
            Interval oldInterval = oldSegment.getInterval();
            Interval newInterval = null;
            for (Interval replaceInterval : replaceIntervals) {
                if (replaceInterval.contains((ReadableInterval)oldInterval)) {
                    newInterval = replaceInterval;
                    break;
                }
                if (!replaceInterval.overlaps((ReadableInterval)oldInterval)) continue;
                throw new ISE("Incompatible segment intervals for commit: [%s] and [%s].", new Object[]{oldInterval, replaceInterval});
            }
            if (newInterval == null) {
                newInterval = oldInterval;
            }
            int partitionNum = intervalToCurrentPartitionNum.compute(newInterval, (i, value) -> value == null ? 0 : value + 1);
            int numCorePartitions = (Integer)intervalToNumCorePartitions.get(newInterval);
            NumberedShardSpec shardSpec = new NumberedShardSpec(partitionNum, numCorePartitions);
            String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString());
            upgradedSegments.add(DataSegment.builder((DataSegment)oldSegment).interval(newInterval).version(lockVersion).shardSpec((ShardSpec)shardSpec).build());
        }
        return upgradedSegments;
    }

    private void verifySegmentsToCommit(Collection<DataSegment> segments) {
        if (segments.isEmpty()) {
            throw new IllegalArgumentException("No segment to commit");
        }
        String dataSource = segments.iterator().next().getDataSource();
        for (DataSegment segment : segments) {
            if (dataSource.equals(segment.getDataSource())) continue;
            throw new IllegalArgumentException("Segments to commit must all belong to the same datasource");
        }
    }

    private Set<DataSegment> insertSegments(Handle handle, Set<DataSegment> segments) throws IOException {
        Set<String> existingSegmentIds = this.segmentExistsBatch(handle, segments);
        Set<DataSegment> segmentsToInsert = segments.stream().filter(s -> !existingSegmentIds.contains(s.getId().toString())).collect(Collectors.toSet());
        List partitionedSegments = Lists.partition(new ArrayList(segmentsToInsert), (int)100);
        PreparedBatch batch = handle.prepareBatch(this.buildSqlToInsertSegments());
        for (List partition : partitionedSegments) {
            for (DataSegment segment : partition) {
                String now = DateTimes.nowUtc().toString();
                ((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)batch.add().bind("id", segment.getId().toString())).bind("dataSource", segment.getDataSource())).bind("created_date", now)).bind("start", segment.getInterval().getStart().toString())).bind("end", segment.getInterval().getEnd().toString())).bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))).bind("version", segment.getVersion())).bind("used", true)).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).bind("used_status_last_updated", now);
            }
            int[] affectedRows = batch.execute();
            ArrayList<DataSegment> failedInserts = new ArrayList<DataSegment>();
            for (int i = 0; i < partition.size(); ++i) {
                if (affectedRows[i] == 1) continue;
                failedInserts.add((DataSegment)partition.get(i));
            }
            if (failedInserts.isEmpty()) {
                log.infoSegments((Collection)partition, "Published segments to DB");
                continue;
            }
            throw new ISE("Failed to publish segments to DB: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(failedInserts)});
        }
        return segmentsToInsert;
    }

    private void insertIntoUpgradeSegmentsTable(Handle handle, Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock) {
        if (segmentToReplaceLock.isEmpty()) {
            return;
        }
        PreparedBatch batch = handle.prepareBatch(StringUtils.format((String)"INSERT INTO %1$s (task_id, segment_id, lock_version) VALUES (:task_id, :segment_id, :lock_version)", (Object[])new Object[]{this.dbTables.getUpgradeSegmentsTable()}));
        List partitions = Lists.partition(new ArrayList<Map.Entry<DataSegment, ReplaceTaskLock>>(segmentToReplaceLock.entrySet()), (int)100);
        for (List partition : partitions) {
            for (Map.Entry entry : partition) {
                DataSegment segment = (DataSegment)entry.getKey();
                ReplaceTaskLock lock = (ReplaceTaskLock)entry.getValue();
                ((PreparedBatchPart)((PreparedBatchPart)batch.add().bind("task_id", lock.getSupervisorTaskId())).bind("segment_id", segment.getId().toString())).bind("lock_version", lock.getVersion());
            }
            int[] affectedAppendRows = batch.execute();
            ArrayList<DataSegment> failedInserts = new ArrayList<DataSegment>();
            for (int i = 0; i < partition.size(); ++i) {
                if (affectedAppendRows[i] == 1) continue;
                failedInserts.add((DataSegment)((Map.Entry)partition.get(i)).getKey());
            }
            if (failedInserts.size() <= 0) continue;
            throw new ISE("Failed to insert upgrade segments in DB: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(failedInserts)});
        }
    }

    private List<DataSegment> retrieveSegmentsById(Handle handle, Set<String> segmentIds) {
        if (segmentIds.isEmpty()) {
            return Collections.emptyList();
        }
        String segmentIdCsv = segmentIds.stream().map(id -> "'" + id + "'").collect(Collectors.joining(","));
        ResultIterator resultIterator = handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE id in (%s)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), segmentIdCsv})).setFetchSize(this.connector.getStreamingFetchSize()).map((index, r, ctx) -> (DataSegment)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getBytes(1), DataSegment.class)).iterator();
        return Lists.newArrayList((Iterator)resultIterator);
    }

    private String buildSqlToInsertSegments() {
        return StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload, used_status_last_updated) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload, :used_status_last_updated)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), this.connector.getQuoteString()});
    }

    private Map<String, String> getAppendSegmentsCommittedDuringTask(Handle handle, String taskId) {
        String sql = StringUtils.format((String)"SELECT segment_id, lock_version FROM %1$s WHERE task_id = :task_id", (Object[])new Object[]{this.dbTables.getUpgradeSegmentsTable()});
        ResultIterator resultIterator = ((Query)handle.createQuery(sql).bind("task_id", taskId)).map((index, r, ctx) -> Pair.of((Object)r.getString("segment_id"), (Object)r.getString("lock_version"))).iterator();
        HashMap<String, String> segmentIdToLockVersion = new HashMap<String, String>();
        while (resultIterator.hasNext()) {
            Pair result = (Pair)resultIterator.next();
            segmentIdToLockVersion.put((String)result.lhs, (String)result.rhs);
        }
        return segmentIdToLockVersion;
    }

    private Set<String> segmentExistsBatch(Handle handle, Set<DataSegment> segments) {
        HashSet<String> existedSegments = new HashSet<String>();
        List segmentsLists = Lists.partition(new ArrayList<DataSegment>(segments), (int)100);
        for (List segmentList : segmentsLists) {
            String segmentIds = segmentList.stream().map(segment -> "'" + StringEscapeUtils.escapeSql((String)segment.getId().toString()) + "'").collect(Collectors.joining(","));
            List existIds = handle.createQuery(StringUtils.format((String)"SELECT id FROM %s WHERE id in (%s)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), segmentIds})).mapTo(String.class).list();
            existedSegments.addAll(existIds);
        }
        return existedSegments;
    }

    @Override
    @Nullable
    public DataSourceMetadata retrieveDataSourceMetadata(String dataSource) {
        byte[] bytes = this.connector.lookup(this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
        if (bytes == null) {
            return null;
        }
        return (DataSourceMetadata)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])bytes, DataSourceMetadata.class);
    }

    @Nullable
    private byte[] retrieveDataSourceMetadataWithHandleAsBytes(Handle handle, String dataSource) {
        return this.connector.lookupWithHandle(handle, this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
    }

    protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(Handle handle, String dataSource, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) throws IOException {
        DataStoreMetadataUpdateResult retVal;
        boolean startMetadataMatchesExisting;
        DataSourceMetadata oldCommitMetadataFromDb;
        String oldCommitMetadataSha1FromDb;
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)startMetadata, (Object)"startMetadata");
        Preconditions.checkNotNull((Object)endMetadata, (Object)"endMetadata");
        byte[] oldCommitMetadataBytesFromDb = this.retrieveDataSourceMetadataWithHandleAsBytes(handle, dataSource);
        if (oldCommitMetadataBytesFromDb == null) {
            oldCommitMetadataSha1FromDb = null;
            oldCommitMetadataFromDb = null;
        } else {
            oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes());
            oldCommitMetadataFromDb = (DataSourceMetadata)this.jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
        }
        int startMetadataGreaterThanExisting = 0;
        if (oldCommitMetadataFromDb == null) {
            startMetadataMatchesExisting = startMetadata.isValidStart();
            startMetadataGreaterThanExisting = 1;
        } else {
            if (startMetadata instanceof Comparable) {
                startMetadataGreaterThanExisting = ((Comparable)((Object)startMetadata.asStartMetadata())).compareTo(oldCommitMetadataFromDb.asStartMetadata());
            }
            startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
        }
        if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) {
            log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].", new Object[]{startMetadata, oldCommitMetadataFromDb});
            return new DataStoreMetadataUpdateResult(true, false, "Failed to update the metadata Store. The new start metadata is ahead of last commited end state.", new Object[0]);
        }
        if (!startMetadataMatchesExisting) {
            return new DataStoreMetadataUpdateResult(true, false, StringUtils.format((String)"Inconsistent metadata state. This can happen if you update input topic in a spec without changing the supervisor name. Stored state: [%s], Target state: [%s].", (Object[])new Object[]{oldCommitMetadataFromDb, startMetadata}), new Object[0]);
        }
        DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null ? endMetadata : oldCommitMetadataFromDb.plus(endMetadata);
        byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)newCommitMetadata);
        String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        if (oldCommitMetadataBytesFromDb == null) {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", newCommitMetadataBytes)).bind("commit_metadata_sha1", newCommitMetadataSha1)).execute();
            retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : new DataStoreMetadataUpdateResult(true, true, "Failed to insert metadata for datasource [%s]", dataSource);
        } else {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
            DataStoreMetadataUpdateResult dataStoreMetadataUpdateResult = retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : new DataStoreMetadataUpdateResult(true, true, "Failed to update metadata for datasource [%s]", dataSource);
        }
        if (retVal.isSuccess()) {
            log.info("Updated metadata from[%s] to[%s].", new Object[]{oldCommitMetadataFromDb, newCommitMetadata});
        } else {
            log.info("Not updating metadata, compare-and-swap failure.", new Object[0]);
        }
        return retVal;
    }

    @Override
    public boolean deleteDataSourceMetadata(final String dataSource) {
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int rows = ((Update)handle.createStatement(StringUtils.format((String)"DELETE from %s WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).execute();
                return rows > 0;
            }
        });
    }

    @Override
    public boolean resetDataSourceMetadata(final String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException {
        final byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)dataSourceMetadata);
        final String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int numRows = ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
                return numRows == 1;
            }
        });
    }

    @Override
    public void updateSegmentMetadata(final Set<DataSegment> segments) {
        this.connector.getDBI().inTransaction((TransactionCallback)new TransactionCallback<Void>(){

            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.updatePayload(handle, segment);
                }
                return null;
            }
        });
    }

    @Override
    public void deleteSegments(Set<DataSegment> segments) {
        if (segments.isEmpty()) {
            log.info("No segments to delete.", new Object[0]);
            return;
        }
        String deleteSql = StringUtils.format((String)"DELETE from %s WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()});
        String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get();
        List ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList());
        int numDeletedSegments = (Integer)this.connector.getDBI().inTransaction((handle, transactionStatus) -> {
            PreparedBatch batch = handle.prepareBatch(deleteSql);
            for (String id : ids) {
                ((PreparedBatch)batch.bind("id", id)).add();
            }
            int[] deletedRows = batch.execute();
            return Arrays.stream(deletedRows).sum();
        });
        log.debugSegments(segments, "Delete the metadata of segments");
        log.info("Deleted [%d] segments from metadata storage for dataSource [%s].", new Object[]{numDeletedSegments, dataSource});
    }

    private void updatePayload(Handle handle, DataSegment segment) throws IOException {
        try {
            ((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET payload = :payload WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("id", segment.getId().toString())).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).execute();
        }
        catch (IOException e) {
            log.error((Throwable)e, "Exception inserting into DB", new Object[0]);
            throw e;
        }
    }

    @Override
    public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) {
        return 1 == (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", this.jsonMapper.writeValueAsBytes((Object)metadata))).bind("commit_metadata_sha1", BaseEncoding.base16().encode(Hashing.sha1().hashBytes(this.jsonMapper.writeValueAsBytes((Object)metadata)).asBytes()))).execute());
    }

    @Override
    public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set<String> excludeDatasources) {
        DateTime dateTime = DateTimes.utc((long)timestamp);
        List datasourcesToDelete = (List)this.connector.getDBI().withHandle(handle -> handle.createQuery(StringUtils.format((String)"SELECT dataSource FROM %1$s WHERE created_date < '%2$s'", (Object[])new Object[]{this.dbTables.getDataSourceTable(), dateTime.toString()})).mapTo(String.class).list());
        datasourcesToDelete.removeAll(excludeDatasources);
        return (Integer)this.connector.getDBI().withHandle(handle -> {
            PreparedBatch batch = handle.prepareBatch(StringUtils.format((String)"DELETE FROM %1$s WHERE dataSource = :dataSource AND created_date < '%2$s'", (Object[])new Object[]{this.dbTables.getDataSourceTable(), dateTime.toString()}));
            for (String datasource : datasourcesToDelete) {
                ((PreparedBatch)batch.bind("dataSource", datasource)).add();
            }
            int[] result = batch.execute();
            return IntStream.of(result).sum();
        });
    }

    @Override
    public DataSegment retrieveSegmentForId(String id, boolean includeUnused) {
        return (DataSegment)this.connector.retryTransaction((handle, status) -> {
            if (includeUnused) {
                return SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveSegmentForId(id);
            }
            return SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUsedSegmentForId(id);
        }, 3, 10);
    }

    @Override
    public int deleteUpgradeSegmentsForTask(String taskId) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)handle.createStatement(StringUtils.format((String)"DELETE FROM %s WHERE task_id = :task_id", (Object[])new Object[]{this.dbTables.getUpgradeSegmentsTable()})).bind("task_id", taskId)).execute());
    }

    public static class DataStoreMetadataUpdateResult {
        private final boolean failed;
        private final boolean canRetry;
        @Nullable
        private final String errorMsg;
        public static final DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null, new Object[0]);

        DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable String errorMsg, Object ... errorFormatArgs) {
            this.failed = failed;
            this.canRetry = canRetry;
            this.errorMsg = null == errorMsg ? null : StringUtils.format((String)errorMsg, (Object[])errorFormatArgs);
        }

        public boolean isFailed() {
            return this.failed;
        }

        public boolean isSuccess() {
            return !this.failed;
        }

        public boolean canRetry() {
            return this.canRetry;
        }

        @Nullable
        public String getErrorMsg() {
            return this.errorMsg;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            DataStoreMetadataUpdateResult that = (DataStoreMetadataUpdateResult)o;
            return this.failed == that.failed && this.canRetry == that.canRetry && Objects.equals(this.errorMsg, that.errorMsg);
        }

        public int hashCode() {
            return Objects.hash(this.failed, this.canRetry, this.errorMsg);
        }

        public String toString() {
            return "DataStoreMetadataUpdateResult{failed=" + this.failed + ", canRetry=" + this.canRetry + ", errorMsg='" + this.errorMsg + '\'' + '}';
        }
    }

    private static class PendingSegmentsRecord {
        private final String sequenceName;
        private final byte[] payload;

        static PendingSegmentsRecord fromResultSet(ResultSet resultSet) {
            try {
                return new PendingSegmentsRecord(resultSet.getString(1), resultSet.getBytes(2));
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }

        PendingSegmentsRecord(String sequenceName, byte[] payload) {
            this.payload = payload;
            this.sequenceName = sequenceName;
        }

        public byte[] getPayload() {
            return this.payload;
        }

        public String getSequenceName() {
            return this.sequenceName;
        }
    }

    private static class UniqueAllocateRequest {
        private final Interval interval;
        private final String previousSegmentId;
        private final String sequenceName;
        private final boolean skipSegmentLineageCheck;
        private final int hashCode;

        public UniqueAllocateRequest(Interval interval, SegmentCreateRequest request, boolean skipSegmentLineageCheck) {
            this.interval = interval;
            this.sequenceName = request.getSequenceName();
            this.previousSegmentId = request.getPreviousSegmentId();
            this.skipSegmentLineageCheck = skipSegmentLineageCheck;
            this.hashCode = Objects.hash(interval, this.sequenceName, this.previousSegmentId, skipSegmentLineageCheck);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            UniqueAllocateRequest that = (UniqueAllocateRequest)o;
            return this.skipSegmentLineageCheck == that.skipSegmentLineageCheck && Objects.equals(this.interval, that.interval) && Objects.equals(this.sequenceName, that.sequenceName) && Objects.equals(this.previousSegmentId, that.previousSegmentId);
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    private static class CheckExistingSegmentIdResult {
        private final boolean found;
        @Nullable
        private final SegmentIdWithShardSpec segmentIdentifier;

        CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdWithShardSpec segmentIdentifier) {
            this.found = found;
            this.segmentIdentifier = segmentIdentifier;
        }
    }
}

