/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.RetryTransactionException;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.PreparedBatchPart;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;

public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStorageCoordinator {
    private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
    private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100;
    private final ObjectMapper jsonMapper;
    private final MetadataStorageTablesConfig dbTables;
    private final SQLMetadataConnector connector;

    @Inject
    public IndexerSQLMetadataStorageCoordinator(ObjectMapper jsonMapper, MetadataStorageTablesConfig dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.dbTables = dbTables;
        this.connector = connector;
    }

    @LifecycleStart
    public void start() {
        this.connector.createDataSourceTable();
        this.connector.createPendingSegmentsTable();
        this.connector.createSegmentTable();
    }

    @Override
    public Collection<DataSegment> retrieveUsedSegmentsForIntervals(String dataSource, List<Interval> intervals, Segments visibility) {
        if (intervals == null || intervals.isEmpty()) {
            throw new IAE("null/empty intervals", new Object[0]);
        }
        return this.doRetrieveUsedSegments(dataSource, intervals, visibility);
    }

    @Override
    public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments visibility) {
        return this.doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility);
    }

    private Collection<DataSegment> doRetrieveUsedSegments(String dataSource, List<Interval> intervals, Segments visibility) {
        return (Collection)this.connector.retryWithHandle(handle -> {
            if (visibility == Segments.ONLY_VISIBLE) {
                SegmentTimeline timeline = this.getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
                return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
            }
            return this.retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
        });
    }

    public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource) {
        String rawQueryString = "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true";
        String queryString = StringUtils.format((String)rawQueryString, (Object[])new Object[]{this.dbTables.getSegmentsTable()});
        return (List)this.connector.retryWithHandle(handle -> {
            Query query = (Query)handle.createQuery(queryString).bind("dataSource", dataSource);
            return query.map((index, r, ctx) -> new Pair(JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getBytes("payload"), DataSegment.class), (Object)r.getString("created_date"))).list();
        });
    }

    @Override
    public List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) {
        List matchingSegments = (List)this.connector.inReadOnlyTransaction((handle, status) -> {
            try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUnusedSegments(dataSource, Collections.singletonList(interval));){
                ImmutableList immutableList = ImmutableList.copyOf(iterator);
                return immutableList;
            }
        });
        log.info("Found %,d unused segments for %s for interval %s.", new Object[]{matchingSegments.size(), dataSource, interval});
        return matchingSegments;
    }

    @Override
    public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) {
        Integer numSegmentsMarkedUnused = (Integer)this.connector.retryTransaction((handle, status) -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).markSegmentsUnused(dataSource, interval), 3, 10);
        log.info("Marked %,d segments unused for %s for interval %s.", new Object[]{numSegmentsMarkedUnused, dataSource, interval});
        return numSegmentsMarkedUnused;
    }

    private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(Handle handle, String dataSource, Interval interval) throws IOException {
        HashSet<SegmentIdWithShardSpec> identifiers = new HashSet<SegmentIdWithShardSpec>();
        ResultIterator dbSegments = ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((ResultSetMapper)ByteArrayMapper.FIRST).iterator();
        while (dbSegments.hasNext()) {
            byte[] payload = (byte[])dbSegments.next();
            SegmentIdWithShardSpec identifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
            if (!interval.overlaps((ReadableInterval)identifier.getInterval())) continue;
            identifiers.add(identifier);
        }
        dbSegments.close();
        return identifiers;
    }

    private SegmentTimeline getTimelineForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) throws IOException {
        try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUsedSegments(dataSource, intervals);){
            SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(iterator);
            return segmentTimeline;
        }
    }

    private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) throws IOException {
        try (CloseableIterator<DataSegment> iterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).retrieveUsedSegments(dataSource, intervals);){
            ArrayList<DataSegment> retVal = new ArrayList<DataSegment>();
            iterator.forEachRemaining(retVal::add);
            ArrayList<DataSegment> arrayList = retVal;
            return arrayList;
        }
    }

    @Override
    public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException {
        SegmentPublishResult result = this.announceHistoricalSegments(segments, null, null, null);
        if (!result.isSuccess()) {
            throw new ISE("announceHistoricalSegments failed with null metadata, should not happen.", new Object[0]);
        }
        return result.getSegments();
    }

    @Override
    public SegmentPublishResult announceHistoricalSegments(final Set<DataSegment> segments, final Set<DataSegment> segmentsToDrop, final @Nullable DataSourceMetadata startMetadata, final @Nullable DataSourceMetadata endMetadata) throws IOException {
        if (segments.isEmpty()) {
            throw new IllegalArgumentException("segment set must not be empty");
        }
        final String dataSource = segments.iterator().next().getDataSource();
        for (DataSegment segment : segments) {
            if (dataSource.equals(segment.getDataSource())) continue;
            throw new IllegalArgumentException("segments must all be from the same dataSource");
        }
        if (startMetadata == null && endMetadata != null || startMetadata != null && endMetadata == null) {
            throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
        }
        final HashSet<Object> usedSegments = new HashSet<Object>();
        List segmentHolders = SegmentTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY);
        for (TimelineObjectHolder holder : segmentHolders) {
            for (PartitionChunk chunk : holder.getObject()) {
                usedSegments.add(chunk.getObject());
            }
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    DataStoreMetadataUpdateResult result;
                    definitelyNotUpdated.set(false);
                    if (startMetadata != null && (result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata)) != DataStoreMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result == DataStoreMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    if (segmentsToDrop != null && !segmentsToDrop.isEmpty() && (result = IndexerSQLMetadataStorageCoordinator.this.dropSegmentsWithHandle(handle, segmentsToDrop, dataSource)) != DataStoreMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result == DataStoreMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    Set inserted = IndexerSQLMetadataStorageCoordinator.this.announceHistoricalSegmentBatch(handle, segments, usedSegments);
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.copyOf((Collection)inserted));
                }
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @Override
    public SegmentPublishResult commitMetadataOnly(final String dataSource, final DataSourceMetadata startMetadata, final DataSourceMetadata endMetadata) {
        if (dataSource == null) {
            throw new IllegalArgumentException("datasource name cannot be null");
        }
        if (startMetadata == null) {
            throw new IllegalArgumentException("start metadata cannot be null");
        }
        if (endMetadata == null) {
            throw new IllegalArgumentException("end metadata cannot be null");
        }
        final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    definitelyNotUpdated.set(false);
                    DataStoreMetadataUpdateResult result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
                    if (result != DataStoreMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        definitelyNotUpdated.set(true);
                        if (result == DataStoreMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    return SegmentPublishResult.ok((Set<DataSegment>)ImmutableSet.of());
                }
            }, 3, this.getSqlMetadataMaxRetry());
        }
        catch (CallbackFailedException e) {
            if (definitelyNotUpdated.get()) {
                return SegmentPublishResult.fail(e.getMessage());
            }
            throw e;
        }
    }

    @VisibleForTesting
    public int getSqlMetadataMaxRetry() {
        return 10;
    }

    @Override
    public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(String dataSource, Interval allocateInterval, boolean skipSegmentLineageCheck, List<SegmentCreateRequest> requests) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)allocateInterval, (Object)"interval");
        Interval interval = allocateInterval.withChronology((Chronology)ISOChronology.getInstanceUTC());
        return (Map)this.connector.retryWithHandle(handle -> this.allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests));
    }

    @Override
    public SegmentIdWithShardSpec allocatePendingSegment(String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, PartialShardSpec partialShardSpec, String maxVersion, boolean skipSegmentLineageCheck) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)sequenceName, (Object)"sequenceName");
        Preconditions.checkNotNull((Object)interval, (Object)"interval");
        Preconditions.checkNotNull((Object)maxVersion, (Object)"version");
        Interval allocateInterval = interval.withChronology((Chronology)ISOChronology.getInstanceUTC());
        return (SegmentIdWithShardSpec)this.connector.retryWithHandle(handle -> {
            if (skipSegmentLineageCheck) {
                return this.allocatePendingSegment(handle, dataSource, sequenceName, allocateInterval, partialShardSpec, maxVersion);
            }
            return this.allocatePendingSegmentWithSegmentLineageCheck(handle, dataSource, sequenceName, previousSegmentId, allocateInterval, partialShardSpec, maxVersion);
        });
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(Handle handle, String dataSource, String sequenceName, @Nullable String previousSegmentId, Interval interval, PartialShardSpec partialShardSpec, String maxVersion) throws IOException {
        String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
        CheckExistingSegmentIdResult result = this.checkAndGetExistingSegmentId((Query<Map<String, Object>>)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})), interval, sequenceName, previousSegmentIdNotNull, Pair.of((Object)"dataSource", (Object)dataSource), Pair.of((Object)"sequence_name", (Object)sequenceName), Pair.of((Object)"sequence_prev_id", (Object)previousSegmentIdNotNull));
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, partialShardSpec, maxVersion);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putBytes(StringUtils.toUtf8((String)previousSegmentIdNotNull)).hash().asBytes());
        this.insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, previousSegmentIdNotNull, sequenceName, sequenceNamePrevIdSha1);
        return newIdentifier;
    }

    private Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(Handle handle, String dataSource, Interval interval, boolean skipSegmentLineageCheck, List<SegmentCreateRequest> requests) throws IOException {
        Map<SegmentCreateRequest, CheckExistingSegmentIdResult> existingSegmentIds = skipSegmentLineageCheck ? this.getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests) : this.getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests);
        HashMap<SegmentCreateRequest, SegmentIdWithShardSpec> allocatedSegmentIds = new HashMap<SegmentCreateRequest, SegmentIdWithShardSpec>();
        ArrayList<SegmentCreateRequest> requestsForNewSegments = new ArrayList<SegmentCreateRequest>();
        for (SegmentCreateRequest request : requests) {
            CheckExistingSegmentIdResult existingSegmentId = existingSegmentIds.get(request);
            if (existingSegmentId == null || !existingSegmentId.found) {
                requestsForNewSegments.add(request);
                continue;
            }
            if (existingSegmentId.segmentIdentifier != null) {
                log.info("Found valid existing segment [%s] for request.", new Object[]{existingSegmentId.segmentIdentifier});
                allocatedSegmentIds.put(request, existingSegmentId.segmentIdentifier);
                continue;
            }
            log.info("Found clashing existing segment [%s] for request.", new Object[]{existingSegmentId});
        }
        Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = this.createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments);
        this.insertPendingSegmentsIntoMetastore(handle, createdSegments, dataSource, interval, skipSegmentLineageCheck);
        allocatedSegmentIds.putAll(createdSegments);
        return allocatedSegmentIds;
    }

    private String getSequenceNameAndPrevIdSha(SegmentCreateRequest request, Interval interval, boolean skipSegmentLineageCheck) {
        Hasher hasher = Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)request.getSequenceName())).putByte((byte)-1);
        if (skipSegmentLineageCheck) {
            hasher.putLong(interval.getStartMillis()).putLong(interval.getEndMillis());
        } else {
            hasher.putBytes(StringUtils.toUtf8((String)request.getPreviousSegmentId()));
        }
        return BaseEncoding.base16().encode(hasher.hash().asBytes());
    }

    @Nullable
    private SegmentIdWithShardSpec allocatePendingSegment(Handle handle, String dataSource, String sequenceName, Interval interval, PartialShardSpec partialShardSpec, String maxVersion) throws IOException {
        CheckExistingSegmentIdResult result = this.checkAndGetExistingSegmentId((Query<Map<String, Object>>)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND start = :start AND %2$send%2$s = :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})), interval, sequenceName, null, Pair.of((Object)"dataSource", (Object)dataSource), Pair.of((Object)"sequence_name", (Object)sequenceName), Pair.of((Object)"start", (Object)interval.getStart().toString()), Pair.of((Object)"end", (Object)interval.getEnd().toString()));
        if (result.found) {
            return result.segmentIdentifier;
        }
        SegmentIdWithShardSpec newIdentifier = this.createNewSegment(handle, dataSource, interval, partialShardSpec, maxVersion);
        if (newIdentifier == null) {
            return null;
        }
        String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8((String)sequenceName)).putByte((byte)-1).putLong(interval.getStartMillis()).putLong(interval.getEndMillis()).hash().asBytes());
        this.insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
        log.info("Allocated pending segment [%s] for sequence[%s] in DB", new Object[]{newIdentifier, sequenceName});
        return newIdentifier;
    }

    private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsSkipLineageCheck(Handle handle, String dataSource, Interval interval, List<SegmentCreateRequest> requests) throws IOException {
        Query query = (Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT sequence_name, payload FROM %s WHERE dataSource = :dataSource AND start = :start AND %2$send%2$s = :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString());
        ResultIterator dbSegments = query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)).iterator();
        HashMap<String, SegmentIdWithShardSpec> sequenceToSegmentId = new HashMap<String, SegmentIdWithShardSpec>();
        while (dbSegments.hasNext()) {
            PendingSegmentsRecord record = (PendingSegmentsRecord)dbSegments.next();
            SegmentIdWithShardSpec segmentId = (SegmentIdWithShardSpec)this.jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class);
            sequenceToSegmentId.put(record.getSequenceName(), segmentId);
        }
        HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult>();
        Iterator<SegmentCreateRequest> iterator = requests.iterator();
        while (iterator.hasNext()) {
            SegmentCreateRequest request;
            SegmentIdWithShardSpec segmentId = (SegmentIdWithShardSpec)sequenceToSegmentId.get((request = iterator.next()).getSequenceName());
            requestToResult.put(request, new CheckExistingSegmentIdResult(segmentId != null, segmentId));
        }
        return requestToResult;
    }

    private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsWithLineageCheck(Handle handle, String dataSource, Interval interval, List<SegmentCreateRequest> requests) throws IOException {
        String sql = StringUtils.format((String)"SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()});
        HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<SegmentCreateRequest, CheckExistingSegmentIdResult>();
        for (SegmentCreateRequest request : requests) {
            CheckExistingSegmentIdResult result = this.checkAndGetExistingSegmentId((Query<Map<String, Object>>)((Query)((Query)((Query)handle.createQuery(sql).bind("dataSource", dataSource)).bind("sequence_name", request.getSequenceName())).bind("sequence_prev_id", request.getPreviousSegmentId())), interval, request.getSequenceName(), request.getPreviousSegmentId(), new Pair[0]);
            requestToResult.put(request, result);
        }
        return requestToResult;
    }

    private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(Query<Map<String, Object>> query, Interval interval, String sequenceName, @Nullable String previousSegmentId, Pair<String, String> ... queryVars) throws IOException {
        Query boundQuery = query;
        for (Pair<String, String> var : queryVars) {
            boundQuery = (Query)boundQuery.bind((String)var.lhs, (String)var.rhs);
        }
        List existingBytes = boundQuery.map((ResultSetMapper)ByteArrayMapper.FIRST).list();
        if (existingBytes.isEmpty()) {
            return new CheckExistingSegmentIdResult(false, null);
        }
        SegmentIdWithShardSpec existingIdentifier = (SegmentIdWithShardSpec)this.jsonMapper.readValue((byte[])Iterables.getOnlyElement((Iterable)existingBytes), SegmentIdWithShardSpec.class);
        if (existingIdentifier.getInterval().isEqual((ReadableInterval)interval)) {
            log.info("Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", new Object[]{existingIdentifier, sequenceName, previousSegmentId});
            return new CheckExistingSegmentIdResult(true, existingIdentifier);
        }
        log.warn("Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, does not match requested interval[%s]", new Object[]{existingIdentifier, sequenceName, previousSegmentId, interval});
        return new CheckExistingSegmentIdResult(true, null);
    }

    private void insertPendingSegmentsIntoMetastore(Handle handle, Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments, String dataSource, Interval interval, boolean skipSegmentLineageCheck) throws JsonProcessingException {
        PreparedBatch insertBatch = handle.prepareBatch(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()}));
        HashMap segmentIdToRequest = new HashMap();
        createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
        for (Map.Entry entry : segmentIdToRequest.entrySet()) {
            SegmentCreateRequest request2 = (SegmentCreateRequest)entry.getValue();
            SegmentIdWithShardSpec segmentId2 = (SegmentIdWithShardSpec)entry.getKey();
            ((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)insertBatch.add().bind("id", segmentId2.toString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", request2.getSequenceName())).bind("sequence_prev_id", request2.getPreviousSegmentId())).bind("sequence_name_prev_id_sha1", this.getSequenceNameAndPrevIdSha(request2, interval, skipSegmentLineageCheck))).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segmentId2));
        }
        insertBatch.execute();
    }

    private void insertPendingSegmentIntoMetastore(Handle handle, SegmentIdWithShardSpec newIdentifier, String dataSource, Interval interval, String previousSegmentId, String sequenceName, String sequenceNamePrevIdSha1) throws JsonProcessingException {
        ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString()})).bind("id", newIdentifier.toString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentId)).bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)).bind("payload", this.jsonMapper.writeValueAsBytes((Object)newIdentifier))).execute();
    }

    private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(Handle handle, String dataSource, Interval interval, boolean skipSegmentLineageCheck, List<SegmentCreateRequest> requests) throws IOException {
        String versionOfExistingChunk;
        if (requests.isEmpty()) {
            return Collections.emptyMap();
        }
        List existingChunks = this.getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)).lookup(interval);
        if (existingChunks.size() > 1) {
            log.warn("Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.", new Object[]{dataSource, interval, existingChunks.size()});
            return Collections.emptyMap();
        }
        PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec();
        SegmentIdWithShardSpec committedMaxId = null;
        if (existingChunks.isEmpty()) {
            versionOfExistingChunk = null;
        } else {
            TimelineObjectHolder existingHolder = (TimelineObjectHolder)Iterables.getOnlyElement((Iterable)existingChunks);
            versionOfExistingChunk = (String)existingHolder.getVersion();
            for (DataSegment segment2 : FluentIterable.from((Iterable)existingHolder.getObject()).transform(PartitionChunk::getObject).filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
                if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= segment2.getShardSpec().getPartitionNum()) continue;
                committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment2);
            }
        }
        Set<SegmentIdWithShardSpec> pendingSegments = this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
        HashMap<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<SegmentCreateRequest, SegmentIdWithShardSpec>();
        HashMap<String, SegmentIdWithShardSpec> sequenceHashToSegment = new HashMap<String, SegmentIdWithShardSpec>();
        for (SegmentCreateRequest request : requests) {
            SegmentIdWithShardSpec createdSegment;
            String sequenceHash = this.getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck);
            if (sequenceHashToSegment.containsKey(sequenceHash)) {
                createdSegment = (SegmentIdWithShardSpec)sequenceHashToSegment.get(sequenceHash);
            } else {
                createdSegment = this.createNewSegment(request, dataSource, interval, versionOfExistingChunk, committedMaxId, pendingSegments);
                if (createdSegment != null) {
                    pendingSegments.add(createdSegment);
                    sequenceHashToSegment.put(sequenceHash, createdSegment);
                    log.info("Created new segment [%s]", new Object[]{createdSegment});
                }
            }
            if (createdSegment == null) continue;
            createdSegments.put(request, createdSegment);
        }
        log.info("Created [%d] new segments for [%d] allocate requests.", new Object[]{sequenceHashToSegment.size(), requests.size()});
        return createdSegments;
    }

    private SegmentIdWithShardSpec createNewSegment(SegmentCreateRequest request, String dataSource, Interval interval, String versionOfExistingChunk, SegmentIdWithShardSpec committedMaxId, Set<SegmentIdWithShardSpec> pendingSegments) {
        PartialShardSpec partialShardSpec = request.getPartialShardSpec();
        String existingVersion = request.getVersion();
        if (committedMaxId != null) {
            pendingSegments.add(committedMaxId);
        }
        SegmentIdWithShardSpec overallMaxId = pendingSegments.stream().filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)).filter(id -> versionOfExistingChunk == null || id.getVersion().equals(versionOfExistingChunk)).max(Comparator.comparing(SegmentIdWithShardSpec::getVersion).thenComparing(id -> id.getShardSpec().getPartitionNum())).orElse(null);
        String newSegmentVersion = versionOfExistingChunk != null ? versionOfExistingChunk : (overallMaxId != null ? overallMaxId.getVersion() : null);
        if (overallMaxId == null) {
            int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? 32768 : 0;
            String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
            return new SegmentIdWithShardSpec(dataSource, interval, version, partialShardSpec.complete(this.jsonMapper, newPartitionId, 0));
        }
        if (!overallMaxId.getInterval().equals((Object)interval) || overallMaxId.getVersion().compareTo(existingVersion) > 0) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", new Object[]{dataSource, interval, existingVersion, overallMaxId});
            return null;
        }
        if (committedMaxId != null && committedMaxId.getShardSpec().getNumCorePartitions() == -1) {
            log.warn("Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", new Object[]{committedMaxId, committedMaxId.getShardSpec()});
            return null;
        }
        return new SegmentIdWithShardSpec(dataSource, interval, (String)Preconditions.checkNotNull((Object)newSegmentVersion, (Object)"newSegmentVersion"), partialShardSpec.complete(this.jsonMapper, overallMaxId.getShardSpec().getPartitionNum() + 1, committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()));
    }

    @Nullable
    private SegmentIdWithShardSpec createNewSegment(Handle handle, String dataSource, Interval interval, PartialShardSpec partialShardSpec, String existingVersion) throws IOException {
        String versionOfExistingChunk;
        List existingChunks = this.getTimelineForIntervalsWithHandle(handle, dataSource, (List<Interval>)ImmutableList.of((Object)interval)).lookup(interval);
        if (existingChunks.size() > 1) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.", new Object[]{dataSource, interval, existingChunks.size()});
            return null;
        }
        SegmentIdWithShardSpec committedMaxId = null;
        if (existingChunks.isEmpty()) {
            versionOfExistingChunk = null;
        } else {
            TimelineObjectHolder existingHolder = (TimelineObjectHolder)Iterables.getOnlyElement((Iterable)existingChunks);
            versionOfExistingChunk = (String)existingHolder.getVersion();
            for (DataSegment segment2 : FluentIterable.from((Iterable)existingHolder.getObject()).transform(PartitionChunk::getObject).filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
                if (committedMaxId != null && committedMaxId.getShardSpec().getPartitionNum() >= segment2.getShardSpec().getPartitionNum()) continue;
                committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment2);
            }
        }
        Set<SegmentIdWithShardSpec> pendings = this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
        if (committedMaxId != null) {
            pendings.add(committedMaxId);
        }
        SegmentIdWithShardSpec overallMaxId = pendings.stream().filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)).filter(id -> versionOfExistingChunk == null || id.getVersion().equals(versionOfExistingChunk)).max(Comparator.comparing(SegmentIdWithShardSpec::getVersion).thenComparing(id -> id.getShardSpec().getPartitionNum())).orElse(null);
        String newSegmentVersion = versionOfExistingChunk != null ? versionOfExistingChunk : (overallMaxId != null ? overallMaxId.getVersion() : null);
        if (overallMaxId == null) {
            int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? 32768 : 0;
            String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
            return new SegmentIdWithShardSpec(dataSource, interval, version, partialShardSpec.complete(this.jsonMapper, newPartitionId, 0));
        }
        if (!overallMaxId.getInterval().equals((Object)interval) || overallMaxId.getVersion().compareTo(existingVersion) > 0) {
            log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", new Object[]{dataSource, interval, existingVersion, overallMaxId});
            return null;
        }
        if (committedMaxId != null && committedMaxId.getShardSpec().getNumCorePartitions() == -1) {
            log.warn("Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", new Object[]{committedMaxId, committedMaxId.getShardSpec()});
            return null;
        }
        return new SegmentIdWithShardSpec(dataSource, interval, (String)Preconditions.checkNotNull((Object)newSegmentVersion, (Object)"newSegmentVersion"), partialShardSpec.complete(this.jsonMapper, overallMaxId.getShardSpec().getPartitionNum() + 1, committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()));
    }

    @Override
    public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"DELETE FROM %s WHERE datasource = :dataSource AND created_date >= :start AND created_date < :end", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})).bind("dataSource", dataSource)).bind("start", deleteInterval.getStart().toString())).bind("end", deleteInterval.getEnd().toString())).execute());
    }

    @Override
    public int deletePendingSegments(String dataSource) {
        return (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)handle.createStatement(StringUtils.format((String)"DELETE FROM %s WHERE datasource = :dataSource", (Object[])new Object[]{this.dbTables.getPendingSegmentsTable()})).bind("dataSource", dataSource)).execute());
    }

    private Set<DataSegment> announceHistoricalSegmentBatch(Handle handle, Set<DataSegment> segments, Set<DataSegment> usedSegments) throws IOException {
        HashSet<DataSegment> toInsertSegments = new HashSet<DataSegment>();
        try {
            Set<String> existedSegments = this.segmentExistsBatch(handle, segments);
            log.info("Found these segments already exist in DB: %s", new Object[]{existedSegments});
            for (DataSegment segment : segments) {
                if (existedSegments.contains(segment.getId().toString())) continue;
                toInsertSegments.add(segment);
            }
            List partitionedSegments = Lists.partition(new ArrayList(toInsertSegments), (int)100);
            PreparedBatch preparedBatch = handle.prepareBatch(StringUtils.format((String)"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), this.connector.getQuoteString()}));
            for (List partition : partitionedSegments) {
                for (DataSegment segment : partition) {
                    ((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)((PreparedBatchPart)preparedBatch.add().bind("id", segment.getId().toString())).bind("dataSource", segment.getDataSource())).bind("created_date", DateTimes.nowUtc().toString())).bind("start", segment.getInterval().getStart().toString())).bind("end", segment.getInterval().getEnd().toString())).bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))).bind("version", segment.getVersion())).bind("used", usedSegments.contains(segment))).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment));
                }
                int[] affectedRows = preparedBatch.execute();
                boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
                if (succeeded) {
                    log.infoSegments((Collection)partition, "Published segments to DB");
                    continue;
                }
                List failedToPublish = IntStream.range(0, partition.size()).filter(i -> affectedRows[i] != 1).mapToObj(partition::get).collect(Collectors.toList());
                throw new ISE("Failed to publish segments to DB: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(failedToPublish)});
            }
        }
        catch (Exception e) {
            log.errorSegments(segments, "Exception inserting segments");
            throw e;
        }
        return toInsertSegments;
    }

    private Set<String> segmentExistsBatch(Handle handle, Set<DataSegment> segments) {
        HashSet<String> existedSegments = new HashSet<String>();
        List segmentsLists = Lists.partition(new ArrayList<DataSegment>(segments), (int)100);
        for (List segmentList : segmentsLists) {
            String segmentIds = segmentList.stream().map(segment -> "'" + StringEscapeUtils.escapeSql((String)segment.getId().toString()) + "'").collect(Collectors.joining(","));
            List existIds = handle.createQuery(StringUtils.format((String)"SELECT id FROM %s WHERE id in (%s)", (Object[])new Object[]{this.dbTables.getSegmentsTable(), segmentIds})).mapTo(String.class).list();
            existedSegments.addAll(existIds);
        }
        return existedSegments;
    }

    @Override
    @Nullable
    public DataSourceMetadata retrieveDataSourceMetadata(String dataSource) {
        byte[] bytes = this.connector.lookup(this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
        if (bytes == null) {
            return null;
        }
        return (DataSourceMetadata)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])bytes, DataSourceMetadata.class);
    }

    @Nullable
    private byte[] retrieveDataSourceMetadataWithHandleAsBytes(Handle handle, String dataSource) {
        return this.connector.lookupWithHandle(handle, this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
    }

    protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(Handle handle, String dataSource, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) throws IOException {
        DataStoreMetadataUpdateResult retVal;
        DataSourceMetadata oldCommitMetadataFromDb;
        String oldCommitMetadataSha1FromDb;
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull((Object)startMetadata, (Object)"startMetadata");
        Preconditions.checkNotNull((Object)endMetadata, (Object)"endMetadata");
        byte[] oldCommitMetadataBytesFromDb = this.retrieveDataSourceMetadataWithHandleAsBytes(handle, dataSource);
        if (oldCommitMetadataBytesFromDb == null) {
            oldCommitMetadataSha1FromDb = null;
            oldCommitMetadataFromDb = null;
        } else {
            oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes());
            oldCommitMetadataFromDb = (DataSourceMetadata)this.jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
        }
        boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null ? startMetadata.isValidStart() : startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
        if (!startMetadataMatchesExisting) {
            log.error("Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].", new Object[]{oldCommitMetadataFromDb, startMetadata});
            return DataStoreMetadataUpdateResult.FAILURE;
        }
        DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null ? endMetadata : oldCommitMetadataFromDb.plus(endMetadata);
        byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)newCommitMetadata);
        String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        if (oldCommitMetadataBytesFromDb == null) {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", newCommitMetadataBytes)).bind("commit_metadata_sha1", newCommitMetadataSha1)).execute();
            retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN;
        } else {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
            DataStoreMetadataUpdateResult dataStoreMetadataUpdateResult = retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN;
        }
        if (retVal == DataStoreMetadataUpdateResult.SUCCESS) {
            log.info("Updated metadata from[%s] to[%s].", new Object[]{oldCommitMetadataFromDb, newCommitMetadata});
        } else {
            log.info("Not updating metadata, compare-and-swap failure.", new Object[0]);
        }
        return retVal;
    }

    protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(Handle handle, Collection<DataSegment> segmentsToDrop, String dataSource) {
        Preconditions.checkNotNull((Object)dataSource, (Object)"dataSource");
        Preconditions.checkNotNull(segmentsToDrop, (Object)"segmentsToDrop");
        if (segmentsToDrop.isEmpty()) {
            return DataStoreMetadataUpdateResult.SUCCESS;
        }
        if (segmentsToDrop.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) {
            log.error("Not dropping segments, as not all segments belong to the datasource[%s].", new Object[]{dataSource});
            return DataStoreMetadataUpdateResult.FAILURE;
        }
        int numChangedSegments = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.dbTables, this.jsonMapper).markSegments(segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()), false);
        if (numChangedSegments != segmentsToDrop.size()) {
            log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]", new Object[]{numChangedSegments, segmentsToDrop.size()});
            return DataStoreMetadataUpdateResult.TRY_AGAIN;
        }
        return DataStoreMetadataUpdateResult.SUCCESS;
    }

    @Override
    public boolean deleteDataSourceMetadata(final String dataSource) {
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int rows = ((Update)handle.createStatement(StringUtils.format((String)"DELETE from %s WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).execute();
                return rows > 0;
            }
        });
    }

    @Override
    public boolean resetDataSourceMetadata(final String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException {
        final byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes((Object)dataSourceMetadata);
        final String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            public Boolean withHandle(Handle handle) {
                int numRows = ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource", (Object[])new Object[]{IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
                return numRows == 1;
            }
        });
    }

    @Override
    public void updateSegmentMetadata(final Set<DataSegment> segments) {
        this.connector.getDBI().inTransaction((TransactionCallback)new TransactionCallback<Void>(){

            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.updatePayload(handle, segment);
                }
                return null;
            }
        });
    }

    @Override
    public void deleteSegments(final Set<DataSegment> segments) {
        this.connector.getDBI().inTransaction((TransactionCallback)new TransactionCallback<Void>(){

            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) {
                int segmentSize = segments.size();
                String dataSource = "";
                for (DataSegment segment : segments) {
                    dataSource = segment.getDataSource();
                    IndexerSQLMetadataStorageCoordinator.this.deleteSegment(handle, segment);
                }
                log.debugSegments((Collection)segments, "Delete the metadata of segments");
                log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", new Object[]{segmentSize, dataSource});
                return null;
            }
        });
    }

    private void deleteSegment(Handle handle, DataSegment segment) {
        ((Update)handle.createStatement(StringUtils.format((String)"DELETE from %s WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("id", segment.getId().toString())).execute();
    }

    private void updatePayload(Handle handle, DataSegment segment) throws IOException {
        try {
            ((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET payload = :payload WHERE id = :id", (Object[])new Object[]{this.dbTables.getSegmentsTable()})).bind("id", segment.getId().toString())).bind("payload", this.jsonMapper.writeValueAsBytes((Object)segment))).execute();
        }
        catch (IOException e) {
            log.error((Throwable)e, "Exception inserting into DB", new Object[0]);
            throw e;
        }
    }

    @Override
    public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) {
        return 1 == (Integer)this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", (Object[])new Object[]{this.dbTables.getDataSourceTable()})).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", this.jsonMapper.writeValueAsBytes((Object)metadata))).bind("commit_metadata_sha1", BaseEncoding.base16().encode(Hashing.sha1().hashBytes(this.jsonMapper.writeValueAsBytes((Object)metadata)).asBytes()))).execute());
    }

    @Override
    public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set<String> excludeDatasources) {
        DateTime dateTime = DateTimes.utc((long)timestamp);
        List datasourcesToDelete = (List)this.connector.getDBI().withHandle(handle -> handle.createQuery(StringUtils.format((String)"SELECT dataSource FROM %1$s WHERE created_date < '%2$s'", (Object[])new Object[]{this.dbTables.getDataSourceTable(), dateTime.toString()})).mapTo(String.class).list());
        datasourcesToDelete.removeAll(excludeDatasources);
        return (Integer)this.connector.getDBI().withHandle(handle -> {
            PreparedBatch batch = handle.prepareBatch(StringUtils.format((String)"DELETE FROM %1$s WHERE dataSource = :dataSource AND created_date < '%2$s'", (Object[])new Object[]{this.dbTables.getDataSourceTable(), dateTime.toString()}));
            for (String datasource : datasourcesToDelete) {
                ((PreparedBatch)batch.bind("dataSource", datasource)).add();
            }
            int[] result = batch.execute();
            return IntStream.of(result).sum();
        });
    }

    private static class PendingSegmentsRecord {
        private final String sequenceName;
        private final byte[] payload;

        static PendingSegmentsRecord fromResultSet(ResultSet resultSet) {
            try {
                return new PendingSegmentsRecord(resultSet.getString(1), resultSet.getBytes(2));
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }

        PendingSegmentsRecord(String sequenceName, byte[] payload) {
            this.payload = payload;
            this.sequenceName = sequenceName;
        }

        public byte[] getPayload() {
            return this.payload;
        }

        public String getSequenceName() {
            return this.sequenceName;
        }
    }

    private static class CheckExistingSegmentIdResult {
        private final boolean found;
        @Nullable
        private final SegmentIdWithShardSpec segmentIdentifier;

        CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdWithShardSpec segmentIdentifier) {
            this.found = found;
            this.segmentIdentifier = segmentIdentifier;
        }
    }

    static enum DataStoreMetadataUpdateResult {
        SUCCESS,
        FAILURE,
        TRY_AGAIN;

    }
}

