/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.kafka.KafkaDataSourceMetadata;
import io.druid.indexing.kafka.KafkaIOConfig;
import io.druid.indexing.kafka.KafkaPartitions;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.metrics.Monitor;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.appenderator.UsedSegmentChecker;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceAction;
import io.druid.server.security.ResourceType;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;

public class KafkaIndexTask
extends AbstractTask
implements ChatHandler {
    public static final long PAUSE_FOREVER = -1L;
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
    private static final String TYPE = "index_kafka";
    private static final Random RANDOM = new Random();
    private static final long POLL_TIMEOUT = 100L;
    private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15L;
    private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
    private final DataSchema dataSchema;
    private final InputRowParser<ByteBuffer> parser;
    private final KafkaTuningConfig tuningConfig;
    private final KafkaIOConfig ioConfig;
    private final AuthorizerMapper authorizerMapper;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<Integer, Long>();
    private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<Integer, Long>();
    private final Map<Integer, Long> maxEndOffsets = new HashMap<Integer, Long>();
    private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<Integer, Long>();
    private TaskToolbox toolbox;
    private volatile Appenderator appenderator = null;
    private volatile StreamAppenderatorDriver driver = null;
    private volatile FireDepartmentMetrics fireDepartmentMetrics = null;
    private volatile DateTime startTime;
    private volatile Status status = Status.NOT_STARTED;
    private volatile Thread runThread = null;
    private volatile File sequencesPersistFile = null;
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
    private final Lock pauseLock = new ReentrantLock();
    private final Condition hasPaused = this.pauseLock.newCondition();
    private final Condition shouldResume = this.pauseLock.newCondition();
    private final Lock pollRetryLock = new ReentrantLock();
    private final Condition isAwaitingRetry = this.pollRetryLock.newCondition();
    private final Object statusLock = new Object();
    private volatile boolean pauseRequested = false;
    private volatile long pauseMillis = 0L;
    private long pollRetryMs = 30000L;
    private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
    private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<SequenceMetadata>();
    private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<ListenableFuture<SegmentsAndMetadata>>();
    private final CountDownLatch waitForPublishes = new CountDownLatch(1);
    private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference();
    private final String topic;
    private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
    private ListeningExecutorService publishExecService;
    private final boolean useLegacy;

    @JsonCreator
    public KafkaIndexTask(@JsonProperty(value="id") String id, @JsonProperty(value="resource") TaskResource taskResource, @JsonProperty(value="dataSchema") DataSchema dataSchema, @JsonProperty(value="tuningConfig") KafkaTuningConfig tuningConfig, @JsonProperty(value="ioConfig") KafkaIOConfig ioConfig, @JsonProperty(value="context") Map<String, Object> context, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper) {
        super(id == null ? KafkaIndexTask.makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : id, StringUtils.format((String)"%s_%s", (Object[])new Object[]{TYPE, dataSchema.getDataSource()}), taskResource, dataSchema.getDataSource(), context);
        this.dataSchema = (DataSchema)Preconditions.checkNotNull((Object)dataSchema, (Object)"dataSchema");
        this.parser = (InputRowParser)Preconditions.checkNotNull((Object)dataSchema.getParser(), (Object)"parser");
        this.tuningConfig = (KafkaTuningConfig)Preconditions.checkNotNull((Object)tuningConfig, (Object)"tuningConfig");
        this.ioConfig = (KafkaIOConfig)Preconditions.checkNotNull((Object)ioConfig, (Object)"ioConfig");
        this.chatHandlerProvider = Optional.fromNullable((Object)chatHandlerProvider);
        this.authorizerMapper = authorizerMapper;
        this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
        this.maxEndOffsets.putAll(this.endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, integerLongEntry -> Long.MAX_VALUE)));
        this.topic = ioConfig.getStartPartitions().getTopic();
        this.sequences = new CopyOnWriteArrayList();
        this.useLegacy = context == null || context.get("IS_INCREMENTAL_HANDOFF_SUPPORTED") == null || (Boolean)context.get("IS_INCREMENTAL_HANDOFF_SUPPORTED") == false;
    }

    @VisibleForTesting
    void setPollRetryMs(long retryMs) {
        this.pollRetryMs = retryMs;
    }

    private static String makeTaskId(String dataSource, int randomBits) {
        StringBuilder suffix = new StringBuilder(8);
        for (int i = 0; i < 8; ++i) {
            suffix.append((char)(97 + (randomBits >>> i * 4 & 0xF)));
        }
        return Joiner.on((String)"_").join((Object)TYPE, (Object)dataSource, new Object[]{suffix});
    }

    public int getPriority() {
        return (Integer)this.getContextValue("priority", 75);
    }

    public String getType() {
        return TYPE;
    }

    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public KafkaTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty(value="ioConfig")
    public KafkaIOConfig getIOConfig() {
        return this.ioConfig;
    }

    private void createAndStartPublishExecutor() {
        this.publishExecService = MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)"publish-driver"));
        this.publishExecService.submit(() -> {
            try {
                while (true) {
                    SequenceMetadata sequenceMetadata = this.publishQueue.take();
                    Preconditions.checkNotNull((Object)this.driver);
                    if (sequenceMetadata.isSentinel()) {
                        this.waitForPublishes.countDown();
                        break;
                    }
                    log.info("Publishing segments for sequence [%s]", new Object[]{sequenceMetadata});
                    SegmentsAndMetadata result = (SegmentsAndMetadata)this.driver.publish(sequenceMetadata.getPublisher(this.toolbox, this.ioConfig.isUseTransaction()), (Committer)sequenceMetadata.getCommitterSupplier(this.topic, this.lastPersistedOffsets).get(), (Collection)ImmutableList.of((Object)sequenceMetadata.getSequenceName())).get();
                    if (result == null) {
                        throw new ISE("Transaction failure publishing segments for sequence [%s]", new Object[]{sequenceMetadata});
                    }
                    log.info("Published segments[%s] with metadata[%s].", new Object[]{Joiner.on((String)", ").join((Iterable)result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())), Preconditions.checkNotNull((Object)result.getCommitMetadata(), (Object)"commitMetadata")});
                    this.sequences.remove(sequenceMetadata);
                    this.publishingSequences.remove(sequenceMetadata.getSequenceName());
                    try {
                        this.persistSequences();
                    }
                    catch (IOException e) {
                        log.error((Throwable)e, "Unable to persist state, dying", new Object[0]);
                        Throwables.propagate((Throwable)e);
                    }
                    ListenableFuture handOffFuture = this.driver.registerHandoff(result);
                    this.handOffWaitList.add((ListenableFuture<SegmentsAndMetadata>)handOffFuture);
                }
            }
            catch (Throwable t) {
                if (t instanceof InterruptedException || t instanceof RejectedExecutionException && t.getCause() instanceof InterruptedException) {
                    log.warn("Stopping publish thread as we are interrupted, probably we are shutting down", new Object[0]);
                } else {
                    log.makeAlert(t, "Error in publish thread, dying", new Object[0]).emit();
                    this.throwableAtomicReference.set(t);
                }
                Futures.allAsList(this.handOffWaitList).cancel(true);
                this.waitForPublishes.countDown();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaskStatus run(TaskToolbox toolbox) throws Exception {
        if (this.useLegacy) {
            return this.runLegacy(toolbox);
        }
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        this.toolbox = toolbox;
        if (this.getContext() != null && this.getContext().get("checkpoints") != null) {
            log.info("Got checkpoints [%s]", new Object[]{(String)this.getContext().get("checkpoints")});
            TreeMap checkpoints = (TreeMap)toolbox.getObjectMapper().readValue((String)this.getContext().get("checkpoints"), (TypeReference)new TypeReference<TreeMap<Integer, Map<Integer, Long>>>(){});
            Iterator sequenceOffsets = checkpoints.entrySet().iterator();
            Map.Entry previous = sequenceOffsets.next();
            while (sequenceOffsets.hasNext()) {
                Map.Entry current = sequenceOffsets.next();
                this.sequences.add(new SequenceMetadata((Integer)previous.getKey(), StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), previous.getKey()}), (Map)previous.getValue(), (Map)current.getValue(), true));
                previous = current;
            }
            this.sequences.add(new SequenceMetadata((Integer)previous.getKey(), StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), previous.getKey()}), (Map)previous.getValue(), this.maxEndOffsets, false));
        } else {
            this.sequences.add(new SequenceMetadata(0, StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), 0}), this.ioConfig.getStartPartitions().getPartitionOffsetMap(), this.maxEndOffsets, false));
        }
        this.sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json");
        this.restoreSequences();
        log.info("Starting with sequences:  %s", new Object[]{this.sequences});
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider)this.chatHandlerProvider.get()).register(this.getId(), (ChatHandler)this, false);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartmentForMetrics = new FireDepartment(this.dataSchema, new RealtimeIOConfig(null, null, null), null);
        this.fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
        toolbox.getMonitorScheduler().addMonitor((Monitor)new RealtimeMetricsMonitor((List)ImmutableList.of((Object)fireDepartmentForMetrics), (Map)ImmutableMap.of((Object)"taskId", (Object)new String[]{this.getId()})));
        LookupNodeService lookupNodeService = this.getContextValue("lookupTier") == null ? toolbox.getLookupNodeService() : new LookupNodeService((String)this.getContextValue("lookupTier"));
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(toolbox.getDruidNode(), "peon", (Map)ImmutableMap.of((Object)toolbox.getDataNodeService().getName(), (Object)toolbox.getDataNodeService(), (Object)lookupNodeService.getName(), (Object)lookupNodeService));
        try (KafkaConsumer<byte[], byte[]> consumer = this.newConsumer();){
            Iterator<SequenceMetadata> records;
            toolbox.getDataSegmentServerAnnouncer().announce();
            toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
            this.appenderator = this.newAppenderator(this.fireDepartmentMetrics, toolbox);
            this.driver = this.newDriver(this.appenderator, toolbox, this.fireDepartmentMetrics);
            this.createAndStartPublishExecutor();
            String topic = this.ioConfig.getStartPartitions().getTopic();
            Object restoredMetadata = this.driver.startJob();
            if (restoredMetadata == null) {
                Preconditions.checkState((boolean)this.sequences.get(0).startOffsets.entrySet().stream().allMatch(partitionOffsetEntry -> Longs.compare((long)((Long)partitionOffsetEntry.getValue()), (long)this.ioConfig.getStartPartitions().getPartitionOffsetMap().get(partitionOffsetEntry.getKey())) >= 0), (Object)"Sequence offsets are not compatible with start offsets of task");
                this.nextOffsets.putAll(this.sequences.get(0).startOffsets);
            } else {
                Map restoredMetadataMap = (Map)restoredMetadata;
                KafkaPartitions restoredNextPartitions = (KafkaPartitions)toolbox.getObjectMapper().convertValue(restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                this.nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap());
                if (!restoredNextPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                    throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{restoredNextPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                }
                if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                    throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                }
                if (this.sequences.size() == 0 || this.sequences.get(this.sequences.size() - 1).isCheckpointed()) {
                    this.endOffsets.putAll(this.sequences.size() == 0 ? this.nextOffsets : this.sequences.get(this.sequences.size() - 1).getEndOffsets());
                    log.info("End offsets changed to [%s]", new Object[]{this.endOffsets});
                }
            }
            Supplier committerSupplier = () -> {
                ImmutableMap snapshot = ImmutableMap.copyOf(this.nextOffsets);
                this.lastPersistedOffsets.clear();
                this.lastPersistedOffsets.putAll((Map<Integer, Long>)snapshot);
                return new Committer((Map)snapshot){
                    final /* synthetic */ Map val$snapshot;
                    {
                        this.val$snapshot = map;
                    }

                    public Object getMetadata() {
                        return ImmutableMap.of((Object)KafkaIndexTask.METADATA_NEXT_PARTITIONS, (Object)new KafkaPartitions(KafkaIndexTask.this.ioConfig.getStartPartitions().getTopic(), this.val$snapshot));
                    }

                    public void run() {
                    }
                };
            };
            this.maybePersistAndPublishSequences((Supplier<Committer>)committerSupplier);
            Set<Integer> assignment = this.assignPartitionsAndSeekToNext(consumer, topic);
            boolean stillReading = !assignment.isEmpty();
            this.status = Status.READING;
            try {
                while (stillReading) {
                    if (this.possiblyPause(assignment) && (assignment = this.assignPartitionsAndSeekToNext(consumer, topic)).isEmpty()) {
                        log.info("All partitions have been fully read", new Object[0]);
                        this.publishOnStop.set(true);
                        this.stopRequested.set(true);
                    }
                    if (this.stopRequested.get() || this.sequences.get(this.sequences.size() - 1).isCheckpointed() && !this.ioConfig.isPauseAfterRead()) {
                        this.status = Status.PUBLISHING;
                    }
                    if (this.stopRequested.get()) {
                        break;
                    }
                    this.checkAndMaybeThrowException();
                    if (!this.ioConfig.isPauseAfterRead()) {
                        this.maybePersistAndPublishSequences((Supplier<Committer>)committerSupplier);
                    }
                    records = ConsumerRecords.empty();
                    try {
                        records = consumer.poll(100L);
                    }
                    catch (OffsetOutOfRangeException e) {
                        log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e.getMessage()});
                        this.possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
                        stillReading = this.ioConfig.isPauseAfterRead() || !assignment.isEmpty();
                    }
                    SequenceMetadata sequenceToCheckpoint = null;
                    Iterator iterator = records.iterator();
                    while (iterator.hasNext()) {
                        ConsumerRecord record = (ConsumerRecord)iterator.next();
                        if (log.isTraceEnabled()) {
                            log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{record.topic(), record.partition(), record.offset()});
                        }
                        if (record.offset() < this.endOffsets.get(record.partition())) {
                            if (record.offset() != this.nextOffsets.get(record.partition()).longValue()) {
                                if (this.ioConfig.isSkipOffsetGaps()) {
                                    log.warn("Skipped to offset[%,d] after offset[%,d] in partition[%d].", new Object[]{record.offset(), this.nextOffsets.get(record.partition()), record.partition()});
                                } else {
                                    throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{record.offset(), this.nextOffsets.get(record.partition()), record.partition()});
                                }
                            }
                            try {
                                byte[] valueBytes = (byte[])record.value();
                                List rows = valueBytes == null ? Utils.nullableListOf((Object[])new InputRow[]{null}) : this.parser.parseBatch((Object)ByteBuffer.wrap(valueBytes));
                                boolean isPersistRequired = false;
                                for (InputRow row : rows) {
                                    if (row != null && this.withinMinMaxRecordTime(row)) {
                                        SequenceMetadata sequenceToUse = null;
                                        for (SequenceMetadata sequence : this.sequences) {
                                            if (!sequence.canHandle((ConsumerRecord<byte[], byte[]>)record)) continue;
                                            sequenceToUse = sequence;
                                        }
                                        if (sequenceToUse == null) {
                                            throw new ISE("WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", new Object[]{record.partition(), record.offset(), this.sequences});
                                        }
                                        AppenderatorDriverAddResult addResult = this.driver.add(row, sequenceToUse.getSequenceName(), committerSupplier, true, false);
                                        if (addResult.isOk()) {
                                            if (addResult.getNumRowsInSegment() > this.tuningConfig.getMaxRowsPerSegment() && !sequenceToUse.isCheckpointed()) {
                                                sequenceToCheckpoint = sequenceToUse;
                                            }
                                            isPersistRequired |= addResult.isPersistRequired();
                                        } else {
                                            throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{row.getTimestamp()});
                                        }
                                        this.fireDepartmentMetrics.incrementProcessed();
                                        continue;
                                    }
                                    this.fireDepartmentMetrics.incrementThrownAway();
                                }
                                if (isPersistRequired) {
                                    Futures.addCallback((ListenableFuture)this.driver.persistAsync((Committer)committerSupplier.get()), (FutureCallback)new FutureCallback<Object>(){

                                        public void onSuccess(@Nullable Object result) {
                                            log.info("Persist completed with metadata [%s]", new Object[]{result});
                                        }

                                        public void onFailure(Throwable t) {
                                            log.error("Persist failed, dying", new Object[0]);
                                            KafkaIndexTask.this.throwableAtomicReference.set(t);
                                        }
                                    });
                                }
                            }
                            catch (ParseException e) {
                                if (this.tuningConfig.isReportParseExceptions()) {
                                    throw e;
                                }
                                log.debug((Throwable)e, "Dropping unparseable row from partition[%d] offset[%,d].", new Object[]{record.partition(), record.offset()});
                                this.fireDepartmentMetrics.incrementUnparseable();
                            }
                            this.nextOffsets.put(record.partition(), record.offset() + 1L);
                        }
                        if (!this.nextOffsets.get(record.partition()).equals(this.endOffsets.get(record.partition())) || !assignment.remove(record.partition())) continue;
                        log.info("Finished reading topic[%s], partition[%,d].", new Object[]{record.topic(), record.partition()});
                        KafkaIndexTask.assignPartitions(consumer, topic, assignment);
                        stillReading = this.ioConfig.isPauseAfterRead() || !assignment.isEmpty();
                    }
                    if (sequenceToCheckpoint == null || this.ioConfig.isPauseAfterRead()) continue;
                    Preconditions.checkArgument((boolean)this.sequences.get(this.sequences.size() - 1).getSequenceName().equals(sequenceToCheckpoint.getSequenceName()), (String)"Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", (Object[])new Object[]{sequenceToCheckpoint, this.sequences});
                    this.requestPause(-1L);
                    if (((Boolean)toolbox.getTaskActionClient().submit((TaskAction)new CheckPointDataSourceMetadataAction(this.getDataSource(), this.ioConfig.getBaseSequenceName(), (DataSourceMetadata)new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), (DataSourceMetadata)new KafkaDataSourceMetadata(new KafkaPartitions(topic, this.nextOffsets))))).booleanValue()) continue;
                    throw new ISE("Checkpoint request with offsets [%s] failed, dying", new Object[]{this.nextOffsets});
                }
            }
            finally {
                log.info("Persisting all pending data", new Object[0]);
                this.driver.persist((Committer)committerSupplier.get());
            }
            records = this.statusLock;
            synchronized (records) {
                if (this.stopRequested.get() && !this.publishOnStop.get()) {
                    throw new InterruptedException("Stopping without publishing");
                }
                this.status = Status.PUBLISHING;
            }
            for (SequenceMetadata sequenceMetadata : this.sequences) {
                if (this.publishingSequences.contains(sequenceMetadata.getSequenceName())) continue;
                sequenceMetadata.setEndOffsets(this.nextOffsets);
                sequenceMetadata.updateAssignments(this.nextOffsets);
                this.publishingSequences.add(sequenceMetadata.getSequenceName());
                this.publishQueue.add(sequenceMetadata);
            }
            this.publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
            this.waitForPublishes.await();
            this.checkAndMaybeThrowException();
            List handedOffList = Lists.newArrayList();
            if (this.tuningConfig.getHandoffConditionTimeout() == 0L) {
                handedOffList = (List)Futures.allAsList(this.handOffWaitList).get();
            } else {
                try {
                    handedOffList = (List)Futures.allAsList(this.handOffWaitList).get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {
                    log.makeAlert("Timed out after [%d] millis waiting for handoffs", new Object[]{this.tuningConfig.getHandoffConditionTimeout()}).addData("TaskId", (Object)this.getId()).emit();
                }
            }
            for (SegmentsAndMetadata handedOff : handedOffList) {
                if (handedOff == null) {
                    log.warn("Handoff failed for segments %s", new Object[]{handedOff.getSegments()});
                    continue;
                }
                log.info("Handoff completed for segments[%s] with metadata[%s].", new Object[]{Joiner.on((String)", ").join((Iterable)handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())), Preconditions.checkNotNull((Object)handedOff.getCommitMetadata(), (Object)"commitMetadata")});
            }
        }
        catch (InterruptedException | RejectedExecutionException e) {
            this.appenderator.closeNow();
            if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
                throw e;
            }
            if (!this.stopRequested.get()) {
                Thread.currentThread().interrupt();
                throw e;
            }
            log.info("The task was asked to stop before completing", new Object[0]);
        }
        finally {
            if (this.appenderator != null) {
                if (this.throwableAtomicReference.get() != null) {
                    this.appenderator.closeNow();
                } else {
                    this.appenderator.close();
                }
            }
            if (this.driver != null) {
                this.driver.close();
            }
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider)this.chatHandlerProvider.get()).unregister(this.getId());
            }
            if (this.publishExecService != null) {
                this.publishExecService.shutdownNow();
            }
            toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
            toolbox.getDataSegmentServerAnnouncer().unannounce();
        }
        return this.success();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TaskStatus runLegacy(TaskToolbox toolbox) throws Exception {
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        this.toolbox = toolbox;
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider)this.chatHandlerProvider.get()).register(this.getId(), (ChatHandler)this, false);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartmentForMetrics = new FireDepartment(this.dataSchema, new RealtimeIOConfig(null, null, null), null);
        this.fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
        toolbox.getMonitorScheduler().addMonitor((Monitor)new RealtimeMetricsMonitor((List)ImmutableList.of((Object)fireDepartmentForMetrics), (Map)ImmutableMap.of((Object)"taskId", (Object)new String[]{this.getId()})));
        LookupNodeService lookupNodeService = this.getContextValue("lookupTier") == null ? toolbox.getLookupNodeService() : new LookupNodeService((String)this.getContextValue("lookupTier"));
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(toolbox.getDruidNode(), "peon", (Map)ImmutableMap.of((Object)toolbox.getDataNodeService().getName(), (Object)toolbox.getDataNodeService(), (Object)lookupNodeService.getName(), (Object)lookupNodeService));
        try (Appenderator appenderator0 = this.newAppenderator(this.fireDepartmentMetrics, toolbox);
             StreamAppenderatorDriver driver = this.newDriver(appenderator0, toolbox, this.fireDepartmentMetrics);
             KafkaConsumer<byte[], byte[]> consumer = this.newConsumer();){
            Object records;
            toolbox.getDataSegmentServerAnnouncer().announce();
            toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
            this.appenderator = appenderator0;
            String topic = this.ioConfig.getStartPartitions().getTopic();
            Object restoredMetadata = driver.startJob();
            if (restoredMetadata == null) {
                this.nextOffsets.putAll(this.ioConfig.getStartPartitions().getPartitionOffsetMap());
            } else {
                Map restoredMetadataMap = (Map)restoredMetadata;
                KafkaPartitions restoredNextPartitions = (KafkaPartitions)toolbox.getObjectMapper().convertValue(restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                this.nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap());
                if (!restoredNextPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                    throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{restoredNextPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                }
                if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                    throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                }
            }
            HashMap sequenceNames = Maps.newHashMap();
            for (Integer partitionNum : this.nextOffsets.keySet()) {
                sequenceNames.put(partitionNum, StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), partitionNum}));
            }
            Supplier<Committer> committerSupplier = new Supplier<Committer>(){

                public Committer get() {
                    ImmutableMap snapshot = ImmutableMap.copyOf((Map)KafkaIndexTask.this.nextOffsets);
                    return new Committer((Map)snapshot){
                        final /* synthetic */ Map val$snapshot;
                        {
                            this.val$snapshot = map;
                        }

                        public Object getMetadata() {
                            return ImmutableMap.of((Object)KafkaIndexTask.METADATA_NEXT_PARTITIONS, (Object)new KafkaPartitions(KafkaIndexTask.this.ioConfig.getStartPartitions().getTopic(), this.val$snapshot));
                        }

                        public void run() {
                        }
                    };
                }
            };
            Set<Integer> assignment = this.assignPartitionsAndSeekToNext(consumer, topic);
            boolean stillReading = !assignment.isEmpty();
            this.status = Status.READING;
            try {
                while (stillReading) {
                    if (this.possiblyPause(assignment) && (assignment = this.assignPartitionsAndSeekToNext(consumer, topic)).isEmpty()) {
                        log.info("All partitions have been fully read", new Object[0]);
                        this.publishOnStop.set(true);
                        this.stopRequested.set(true);
                    }
                    if (this.stopRequested.get()) {
                        break;
                    }
                    records = ConsumerRecords.empty();
                    try {
                        records = consumer.poll(100L);
                    }
                    catch (OffsetOutOfRangeException e) {
                        log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e.getMessage()});
                        this.possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
                        stillReading = this.ioConfig.isPauseAfterRead() || !assignment.isEmpty();
                    }
                    for (ConsumerRecord record : records) {
                        if (log.isTraceEnabled()) {
                            log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{record.topic(), record.partition(), record.offset()});
                        }
                        if (record.offset() < this.endOffsets.get(record.partition())) {
                            if (record.offset() != this.nextOffsets.get(record.partition()).longValue()) {
                                if (this.ioConfig.isSkipOffsetGaps()) {
                                    log.warn("Skipped to offset[%,d] after offset[%,d] in partition[%d].", new Object[]{record.offset(), this.nextOffsets.get(record.partition()), record.partition()});
                                } else {
                                    throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{record.offset(), this.nextOffsets.get(record.partition()), record.partition()});
                                }
                            }
                            try {
                                byte[] valueBytes = (byte[])record.value();
                                List rows = valueBytes == null ? Utils.nullableListOf((Object[])new InputRow[]{null}) : this.parser.parseBatch((Object)ByteBuffer.wrap(valueBytes));
                                boolean isPersistRequired = false;
                                HashMap<String, Set> segmentsToMoveOut = new HashMap<String, Set>();
                                for (InputRow row : rows) {
                                    if (row != null && this.withinMinMaxRecordTime(row)) {
                                        String sequenceName = (String)sequenceNames.get(record.partition());
                                        AppenderatorDriverAddResult addResult = driver.add(row, sequenceName, (Supplier)committerSupplier, false, false);
                                        if (addResult.isOk()) {
                                            if (addResult.getNumRowsInSegment() > this.tuningConfig.getMaxRowsPerSegment()) {
                                                segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet()).add(addResult.getSegmentIdentifier());
                                            }
                                            isPersistRequired |= addResult.isPersistRequired();
                                        } else {
                                            throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{row.getTimestamp()});
                                        }
                                        this.fireDepartmentMetrics.incrementProcessed();
                                        continue;
                                    }
                                    this.fireDepartmentMetrics.incrementThrownAway();
                                }
                                if (isPersistRequired) {
                                    driver.persist((Committer)committerSupplier.get());
                                }
                                segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut((String)sequenceSegments.getKey(), ((Set)sequenceSegments.getValue()).stream().collect(Collectors.toList())));
                            }
                            catch (ParseException e) {
                                if (this.tuningConfig.isReportParseExceptions()) {
                                    throw e;
                                }
                                log.debug((Throwable)e, "Dropping unparseable row from partition[%d] offset[%,d].", new Object[]{record.partition(), record.offset()});
                                this.fireDepartmentMetrics.incrementUnparseable();
                            }
                            this.nextOffsets.put(record.partition(), record.offset() + 1L);
                        }
                        if (!this.nextOffsets.get(record.partition()).equals(this.endOffsets.get(record.partition())) || !assignment.remove(record.partition())) continue;
                        log.info("Finished reading topic[%s], partition[%,d].", new Object[]{record.topic(), record.partition()});
                        KafkaIndexTask.assignPartitions(consumer, topic, assignment);
                        stillReading = this.ioConfig.isPauseAfterRead() || !assignment.isEmpty();
                    }
                }
            }
            finally {
                driver.persist((Committer)committerSupplier.get());
            }
            records = this.statusLock;
            synchronized (records) {
                if (this.stopRequested.get() && !this.publishOnStop.get()) {
                    throw new InterruptedException("Stopping without publishing");
                }
                this.status = Status.PUBLISHING;
            }
            TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
                KafkaPartitions finalPartitions = (KafkaPartitions)toolbox.getObjectMapper().convertValue(((Map)Preconditions.checkNotNull((Object)commitMetadata, (Object)"commitMetadata")).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                if (!this.endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
                    throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", new Object[]{commitMetadata});
                }
                SegmentTransactionalInsertAction action = this.ioConfig.isUseTransaction() ? new SegmentTransactionalInsertAction(segments, (DataSourceMetadata)new KafkaDataSourceMetadata(this.ioConfig.getStartPartitions()), (DataSourceMetadata)new KafkaDataSourceMetadata(finalPartitions)) : new SegmentTransactionalInsertAction(segments, null, null);
                log.info("Publishing with isTransaction[%s].", new Object[]{this.ioConfig.isUseTransaction()});
                return ((SegmentPublishResult)toolbox.getTaskActionClient().submit((TaskAction)action)).isSuccess();
            };
            SegmentsAndMetadata published = (SegmentsAndMetadata)driver.publish(publisher, (Committer)committerSupplier.get(), sequenceNames.values()).get();
            ListenableFuture handoffFuture = driver.registerHandoff(published);
            SegmentsAndMetadata handedOff = this.tuningConfig.getHandoffConditionTimeout() == 0L ? (SegmentsAndMetadata)handoffFuture.get() : (SegmentsAndMetadata)handoffFuture.get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
            if (handedOff == null) {
                throw new ISE("Transaction failure publishing segments, aborting", new Object[0]);
            }
            log.info("Published segments[%s] with metadata[%s].", new Object[]{Joiner.on((String)", ").join(Iterables.transform((Iterable)handedOff.getSegments(), (Function)new Function<DataSegment, String>(){

                public String apply(DataSegment input) {
                    return input.getIdentifier();
                }
            })), Preconditions.checkNotNull((Object)handedOff.getCommitMetadata(), (Object)"commitMetadata")});
        }
        catch (InterruptedException | RejectedExecutionException e) {
            if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
                throw e;
            }
            if (!this.stopRequested.get()) {
                Thread.currentThread().interrupt();
                throw e;
            }
            log.info("The task was asked to stop before completing", new Object[0]);
        }
        finally {
            if (this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider)this.chatHandlerProvider.get()).unregister(this.getId());
            }
            toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
            toolbox.getDataSegmentServerAnnouncer().unannounce();
        }
        return this.success();
    }

    private void checkAndMaybeThrowException() {
        if (this.throwableAtomicReference.get() != null) {
            Throwables.propagate((Throwable)this.throwableAtomicReference.get());
        }
    }

    private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier) throws InterruptedException {
        for (SequenceMetadata sequenceMetadata : this.sequences) {
            sequenceMetadata.updateAssignments(this.nextOffsets);
            if (sequenceMetadata.isOpen() || this.publishingSequences.contains(sequenceMetadata.getSequenceName())) continue;
            this.publishingSequences.add(sequenceMetadata.getSequenceName());
            try {
                Object result = this.driver.persist((Committer)committerSupplier.get());
                log.info("Persist completed with results: [%s], adding sequence [%s] to publish queue", new Object[]{result, sequenceMetadata});
                this.publishQueue.add(sequenceMetadata);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while persisting sequence [%s]", new Object[]{sequenceMetadata});
                throw e;
            }
        }
    }

    private void restoreSequences() throws IOException {
        Preconditions.checkNotNull((Object)this.sequencesPersistFile);
        if (this.sequencesPersistFile.exists()) {
            this.sequences = new CopyOnWriteArrayList((Collection)this.toolbox.getObjectMapper().readValue(this.sequencesPersistFile, (TypeReference)new TypeReference<List<SequenceMetadata>>(){}));
        }
    }

    private synchronized void persistSequences() throws IOException {
        log.info("Persisting Sequences Metadata [%s]", new Object[]{this.sequences});
        this.toolbox.getObjectMapper().writerWithType((TypeReference)new TypeReference<List<SequenceMetadata>>(){}).writeValue(this.sequencesPersistFile, this.sequences);
    }

    public boolean canRestore() {
        return true;
    }

    private Access authorizationCheck(HttpServletRequest req, Action action) {
        ResourceAction resourceAction = new ResourceAction(new Resource(this.dataSchema.getDataSource(), ResourceType.DATASOURCE), action);
        Access access = AuthorizationUtils.authorizeResourceAction((HttpServletRequest)req, (ResourceAction)resourceAction, (AuthorizerMapper)this.authorizerMapper);
        if (!access.isAllowed()) {
            throw new ForbiddenException(access.toString());
        }
        return access;
    }

    @VisibleForTesting
    Appenderator getAppenderator() {
        return this.appenderator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void stopGracefully() {
        log.info("Stopping gracefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        Object object = this.statusLock;
        synchronized (object) {
            if (this.status == Status.PUBLISHING) {
                this.runThread.interrupt();
                return;
            }
        }
        try {
            if (!this.pauseLock.tryLock(15L, TimeUnit.SECONDS)) {
                log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread", new Object[0]);
                this.runThread.interrupt();
                return;
            }
            try {
                if (this.pauseRequested) {
                    this.pauseRequested = false;
                    this.shouldResume.signalAll();
                }
            }
            finally {
                this.pauseLock.unlock();
            }
            if (!this.pollRetryLock.tryLock(15L, TimeUnit.SECONDS)) {
                log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread", new Object[0]);
                this.runThread.interrupt();
                return;
            }
            try {
                this.isAwaitingRetry.signalAll();
                return;
            }
            finally {
                this.pollRetryLock.unlock();
            }
        }
        catch (Exception e) {
            Throwables.propagate((Throwable)e);
        }
    }

    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        if (this.appenderator == null) {
            return new NoopQueryRunner();
        }
        return new QueryRunner<T>(){

            public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
                return queryPlus.run((QuerySegmentWalker)KafkaIndexTask.this.appenderator, responseContext);
            }
        };
    }

    @POST
    @Path(value="/stop")
    public Response stop(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.WRITE);
        this.stopGracefully();
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    @GET
    @Path(value="/status")
    @Produces(value={"application/json"})
    public Status getStatusHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.status;
    }

    public Status getStatus() {
        return this.status;
    }

    @GET
    @Path(value="/offsets/current")
    @Produces(value={"application/json"})
    public Map<Integer, Long> getCurrentOffsets(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getCurrentOffsets();
    }

    public Map<Integer, Long> getCurrentOffsets() {
        return this.nextOffsets;
    }

    @GET
    @Path(value="/offsets/end")
    @Produces(value={"application/json"})
    public Map<Integer, Long> getEndOffsetsHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getEndOffsets();
    }

    public Map<Integer, Long> getEndOffsets() {
        return this.endOffsets;
    }

    @POST
    @Path(value="/offsets/end")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    public Response setEndOffsetsHTTP(Map<Integer, Long> offsets, @QueryParam(value="resume") @DefaultValue(value="false") boolean resume, @QueryParam(value="finish") @DefaultValue(value="true") boolean finish, @Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        return this.setEndOffsets(offsets, resume, finish);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Response setEndOffsets(Map<Integer, Long> offsets, boolean resume, boolean finish) throws InterruptedException {
        if (this.useLegacy) {
            return this.setEndOffsetsLegacy(offsets, resume);
        }
        if (offsets == null) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(offsets.keySet())) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"Request contains partitions not being handled by this task, my partitions: %s", (Object[])new Object[]{this.endOffsets.keySet()})).build();
        }
        try {
            this.pauseLock.lockInterruptibly();
            Preconditions.checkState((this.sequences.size() > 0 ? 1 : 0) != 0, (Object)"WTH?! No Sequences found to set end offsets");
            SequenceMetadata latestSequence = this.sequences.get(this.sequences.size() - 1);
            if (latestSequence.getStartOffsets().equals(offsets) && !finish || latestSequence.getEndOffsets().equals(offsets) && finish) {
                log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", new Object[]{this.sequences});
                Response response = Response.ok(offsets).build();
                return response;
            }
            if (latestSequence.isCheckpointed() && !this.ioConfig.isPauseAfterRead()) {
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", (Object[])new Object[]{latestSequence, offsets})).build();
                return response;
            }
            if (!this.isPaused()) {
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Task must be paused before changing the end offsets").build();
                return response;
            }
            for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) >= 0) continue;
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"End offset must be >= current offset for partition [%s] (current: %s)", (Object[])new Object[]{entry.getKey(), this.nextOffsets.get(entry.getKey())})).build();
                return response;
            }
            latestSequence.setEndOffsets(offsets);
            if (finish) {
                log.info("Updating endOffsets from [%s] to [%s]", new Object[]{this.endOffsets, offsets});
                this.endOffsets.putAll(offsets);
            } else {
                Preconditions.checkState((!this.ioConfig.isPauseAfterRead() ? 1 : 0) != 0);
                SequenceMetadata newSequence = new SequenceMetadata(latestSequence.getSequenceId() + 1, StringUtils.format((String)"%s_%d", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1}), offsets, this.maxEndOffsets, false);
                this.sequences.add(newSequence);
            }
            this.persistSequences();
        }
        catch (Exception e) {
            log.error((Throwable)e, "Unable to set end offsets, dying", new Object[0]);
            this.throwableAtomicReference.set(e);
            Throwables.propagate((Throwable)e);
        }
        finally {
            this.pauseLock.unlock();
        }
        if (resume) {
            this.resume();
        }
        return Response.ok(offsets).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Response setEndOffsetsLegacy(Map<Integer, Long> offsets, boolean resume) throws InterruptedException {
        if (offsets == null) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(offsets.keySet())) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"Request contains partitions not being handled by this task, my partitions: %s", (Object[])new Object[]{this.endOffsets.keySet()})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            if (!this.isPaused()) {
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Task must be paused before changing the end offsets").build();
                return response;
            }
            for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) >= 0) continue;
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"End offset must be >= current offset for partition [%s] (current: %s)", (Object[])new Object[]{entry.getKey(), this.nextOffsets.get(entry.getKey())})).build();
                return response;
            }
            this.endOffsets.putAll(offsets);
            log.info("endOffsets changed to %s", new Object[]{this.endOffsets});
        }
        finally {
            this.pauseLock.unlock();
        }
        if (resume) {
            this.resume();
        }
        return Response.ok(this.endOffsets).build();
    }

    @GET
    @Path(value="/checkpoints")
    @Produces(value={"application/json"})
    public Map<Integer, Map<Integer, Long>> getCheckpointsHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getCheckpoints();
    }

    public Map<Integer, Map<Integer, Long>> getCheckpoints() {
        TreeMap<Integer, Map<Integer, Long>> result = new TreeMap<Integer, Map<Integer, Long>>();
        result.putAll(this.sequences.stream().collect(Collectors.toMap(SequenceMetadata::getSequenceId, SequenceMetadata::getStartOffsets)));
        return result;
    }

    @POST
    @Path(value="/pause")
    @Produces(value={"application/json"})
    public Response pauseHTTP(@QueryParam(value="timeout") @DefaultValue(value="0") long timeout, @Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        return this.pause(timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Response pause(long timeout) throws InterruptedException {
        if (this.status != Status.PAUSED && this.status != Status.READING) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"Can't pause, task is not in a pausable state (state: [%s])", (Object[])new Object[]{this.status})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseMillis = timeout <= 0L ? -1L : timeout;
            this.pauseRequested = true;
            this.pollRetryLock.lockInterruptibly();
            try {
                this.isAwaitingRetry.signalAll();
            }
            finally {
                this.pollRetryLock.unlock();
            }
            if (this.isPaused()) {
                this.shouldResume.signalAll();
            }
            long nanos = TimeUnit.SECONDS.toNanos(2L);
            while (!this.isPaused()) {
                if (nanos <= 0L) {
                    Response response = Response.status((Response.Status)Response.Status.ACCEPTED).entity((Object)"Request accepted but task has not yet paused").build();
                    return response;
                }
                nanos = this.hasPaused.awaitNanos(nanos);
            }
        }
        finally {
            this.pauseLock.unlock();
        }
        try {
            return Response.ok().entity((Object)this.toolbox.getObjectMapper().writeValueAsString(this.getCurrentOffsets())).build();
        }
        catch (JsonProcessingException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    @POST
    @Path(value="/resume")
    public Response resumeHTTP(@Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        this.resume();
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (this.isPaused()) {
                if (nanos <= 0L) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        }
        finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Path(value="/time/start")
    @Produces(value={"application/json"})
    public DateTime getStartTime(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.WRITE);
        return this.startTime;
    }

    @VisibleForTesting
    FireDepartmentMetrics getFireDepartmentMetrics() {
        return this.fireDepartmentMetrics;
    }

    private boolean isPaused() {
        return this.status == Status.PAUSED;
    }

    private void requestPause(long pauseMillis) {
        this.pauseMillis = pauseMillis;
        this.pauseRequested = true;
    }

    private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) {
        return Appenderators.createRealtime((DataSchema)this.dataSchema, (AppenderatorConfig)this.tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), (FireDepartmentMetrics)metrics, (DataSegmentPusher)toolbox.getSegmentPusher(), (ObjectMapper)toolbox.getObjectMapper(), (IndexIO)toolbox.getIndexIO(), (IndexMerger)toolbox.getIndexMergerV9(), (QueryRunnerFactoryConglomerate)toolbox.getQueryRunnerFactoryConglomerate(), (DataSegmentAnnouncer)toolbox.getSegmentAnnouncer(), (ServiceEmitter)toolbox.getEmitter(), (ExecutorService)toolbox.getQueryExecutorService(), (Cache)toolbox.getCache(), (CacheConfig)toolbox.getCacheConfig());
    }

    private StreamAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox toolbox, FireDepartmentMetrics metrics) {
        return new StreamAppenderatorDriver(appenderator, (SegmentAllocator)new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), this.dataSchema), toolbox.getSegmentHandoffNotifierFactory(), (UsedSegmentChecker)new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KafkaConsumer<byte[], byte[]> newConsumer() {
        ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(((Object)((Object)this)).getClass().getClassLoader());
            Properties props = new Properties();
            for (Map.Entry<String, String> entry : this.ioConfig.getConsumerProperties().entrySet()) {
                props.setProperty(entry.getKey(), entry.getValue());
            }
            props.setProperty("enable.auto.commit", "false");
            props.setProperty("auto.offset.reset", "none");
            props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
            return kafkaConsumer;
        }
        finally {
            Thread.currentThread().setContextClassLoader(currCtxCl);
        }
    }

    private static void assignPartitions(KafkaConsumer consumer, String topic, Set<Integer> partitions) {
        consumer.assign((Collection)Lists.newArrayList((Iterable)partitions.stream().map(n -> new TopicPartition(topic, n.intValue())).collect(Collectors.toList())));
    }

    private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) {
        HashSet assignment = Sets.newHashSet();
        for (Map.Entry<Integer, Long> entry : this.nextOffsets.entrySet()) {
            long endOffset = this.endOffsets.get(entry.getKey());
            if (entry.getValue() < endOffset) {
                assignment.add(entry.getKey());
                continue;
            }
            if (entry.getValue() == endOffset) {
                log.info("Finished reading partition[%d].", new Object[]{entry.getKey()});
                continue;
            }
            throw new ISE("WTF?! Cannot start from offset[%,d] > endOffset[%,d]", new Object[]{entry.getValue(), endOffset});
        }
        KafkaIndexTask.assignPartitions(consumer, topic, assignment);
        Iterator<Map.Entry<Integer, Long>> iterator = assignment.iterator();
        while (iterator.hasNext()) {
            int partition = (Integer)((Object)iterator.next());
            long offset = this.nextOffsets.get(partition);
            log.info("Seeking partition[%d] to offset[%,d].", new Object[]{partition, offset});
            consumer.seek(new TopicPartition(topic, partition), offset);
        }
        return assignment;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (this.ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
                this.pauseMillis = -1L;
                this.pauseRequested = true;
            }
            if (this.pauseRequested) {
                this.status = Status.PAUSED;
                long nanos = 0L;
                this.hasPaused.signalAll();
                while (this.pauseRequested) {
                    if (this.pauseMillis == -1L) {
                        log.info("Pausing ingestion until resumed", new Object[0]);
                        this.shouldResume.await();
                        continue;
                    }
                    if (this.pauseMillis > 0L) {
                        log.info("Pausing ingestion for [%,d] ms", new Object[]{this.pauseMillis});
                        nanos = TimeUnit.MILLISECONDS.toNanos(this.pauseMillis);
                        this.pauseMillis = 0L;
                    }
                    if (nanos <= 0L) {
                        this.pauseRequested = false;
                    }
                    nanos = this.shouldResume.awaitNanos(nanos);
                }
                this.status = Status.READING;
                this.shouldResume.signalAll();
                log.info("Ingestion loop resumed", new Object[0]);
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.pauseLock.unlock();
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void possiblyResetOffsetsOrWait(Map<TopicPartition, Long> outOfRangePartitions, KafkaConsumer<byte[], byte[]> consumer, TaskToolbox taskToolbox) throws InterruptedException, IOException {
        HashMap resetPartitions = Maps.newHashMap();
        boolean doReset = false;
        if (this.tuningConfig.isResetOffsetAutomatically()) {
            for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
                TopicPartition topicPartition = outOfRangePartition.getKey();
                long nextOffset = outOfRangePartition.getValue();
                consumer.seekToBeginning(Collections.singletonList(topicPartition));
                long leastAvailableOffset = consumer.position(topicPartition);
                consumer.seek(topicPartition, nextOffset);
                if (leastAvailableOffset <= nextOffset) continue;
                doReset = true;
                resetPartitions.put(topicPartition, nextOffset);
            }
        }
        if (doReset) {
            this.sendResetRequestAndWait(resetPartitions, taskToolbox);
        } else {
            log.warn("Retrying in %dms", new Object[]{this.pollRetryMs});
            this.pollRetryLock.lockInterruptibly();
            try {
                long nanos = TimeUnit.MILLISECONDS.toNanos(this.pollRetryMs);
                while (nanos > 0L && !this.pauseRequested && !this.stopRequested.get()) {
                    nanos = this.isAwaitingRetry.awaitNanos(nanos);
                }
            }
            finally {
                this.pollRetryLock.unlock();
            }
        }
    }

    private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox) throws IOException {
        HashMap partitionOffsetMap = Maps.newHashMap();
        for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
            partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
        }
        boolean result = (Boolean)taskToolbox.getTaskActionClient().submit((TaskAction)new ResetDataSourceMetadataAction(this.getDataSource(), (DataSourceMetadata)new KafkaDataSourceMetadata(new KafkaPartitions(this.ioConfig.getStartPartitions().getTopic(), partitionOffsetMap))));
        if (result) {
            log.makeAlert("Resetting Kafka offsets for datasource [%s]", new Object[]{this.getDataSource()}).addData("partitions", partitionOffsetMap.keySet()).emit();
            this.requestPause(-1L);
        } else {
            log.makeAlert("Failed to send reset request for partitions [%s]", new Object[]{partitionOffsetMap.keySet()}).emit();
        }
    }

    private boolean withinMinMaxRecordTime(InputRow row) {
        boolean afterMaximumMessageTime;
        boolean beforeMinimumMessageTime = this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime)this.ioConfig.getMinimumMessageTime().get()).isAfter((ReadableInstant)row.getTimestamp());
        boolean bl = afterMaximumMessageTime = this.ioConfig.getMaximumMessageTime().isPresent() && ((DateTime)this.ioConfig.getMaximumMessageTime().get()).isBefore((ReadableInstant)row.getTimestamp());
        if (!Intervals.ETERNITY.contains((ReadableInstant)row.getTimestamp())) {
            String errorMsg = StringUtils.format((String)"Encountered row with timestamp that cannot be represented as a long: [%s]", (Object[])new Object[]{row});
            log.debug(errorMsg, new Object[0]);
            if (this.tuningConfig.isReportParseExceptions()) {
                throw new ParseException(errorMsg, new Object[0]);
            }
            return false;
        }
        if (log.isDebugEnabled()) {
            if (beforeMinimumMessageTime) {
                log.debug("CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", new Object[]{row.getTimestamp(), this.ioConfig.getMinimumMessageTime().get()});
            } else if (afterMaximumMessageTime) {
                log.debug("CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", new Object[]{row.getTimestamp(), this.ioConfig.getMaximumMessageTime().get()});
            }
        }
        return !beforeMinimumMessageTime && !afterMaximumMessageTime;
    }

    private static class SequenceMetadata {
        private final int sequenceId;
        private final String sequenceName;
        private final Map<Integer, Long> startOffsets;
        private final Map<Integer, Long> endOffsets;
        private final Set<Integer> assignments;
        private final boolean sentinel;
        private volatile boolean checkpointed;

        @JsonCreator
        public SequenceMetadata(@JsonProperty(value="sequenceId") int sequenceId, @JsonProperty(value="sequenceName") String sequenceName, @JsonProperty(value="startOffsets") Map<Integer, Long> startOffsets, @JsonProperty(value="endOffsets") Map<Integer, Long> endOffsets, @JsonProperty(value="checkpointed") boolean checkpointed) {
            Preconditions.checkNotNull((Object)sequenceName);
            Preconditions.checkNotNull(startOffsets);
            Preconditions.checkNotNull(endOffsets);
            this.sequenceId = sequenceId;
            this.sequenceName = sequenceName;
            this.startOffsets = ImmutableMap.copyOf(startOffsets);
            this.endOffsets = Maps.newHashMap(endOffsets);
            this.assignments = Sets.newHashSet(startOffsets.keySet());
            this.checkpointed = checkpointed;
            this.sentinel = false;
        }

        @JsonProperty
        public int getSequenceId() {
            return this.sequenceId;
        }

        @JsonProperty
        public boolean isCheckpointed() {
            return this.checkpointed;
        }

        @JsonProperty
        public String getSequenceName() {
            return this.sequenceName;
        }

        @JsonProperty
        public Map<Integer, Long> getStartOffsets() {
            return this.startOffsets;
        }

        @JsonProperty
        public Map<Integer, Long> getEndOffsets() {
            return this.endOffsets;
        }

        @JsonProperty
        public boolean isSentinel() {
            return this.sentinel;
        }

        public void setEndOffsets(Map<Integer, Long> newEndOffsets) {
            this.endOffsets.putAll(newEndOffsets);
            this.checkpointed = true;
        }

        public void updateAssignments(Map<Integer, Long> nextPartitionOffset) {
            this.assignments.clear();
            nextPartitionOffset.entrySet().forEach(partitionOffset -> {
                if (Longs.compare((long)this.endOffsets.get(partitionOffset.getKey()), (long)((Long)nextPartitionOffset.get(partitionOffset.getKey()))) > 0) {
                    this.assignments.add((Integer)partitionOffset.getKey());
                }
            });
        }

        public boolean isOpen() {
            return !this.assignments.isEmpty();
        }

        boolean canHandle(ConsumerRecord<byte[], byte[]> record) {
            return this.isOpen() && this.endOffsets.get(record.partition()) != null && record.offset() >= this.startOffsets.get(record.partition()) && record.offset() < this.endOffsets.get(record.partition());
        }

        private SequenceMetadata() {
            this.sequenceId = -1;
            this.sequenceName = null;
            this.startOffsets = null;
            this.endOffsets = null;
            this.assignments = null;
            this.checkpointed = true;
            this.sentinel = true;
        }

        public static SequenceMetadata getSentinelSequenceMetadata() {
            return new SequenceMetadata();
        }

        public String toString() {
            return "SequenceMetadata{sequenceName='" + this.sequenceName + '\'' + ", sequenceId=" + this.sequenceId + ", startOffsets=" + this.startOffsets + ", endOffsets=" + this.endOffsets + ", assignments=" + this.assignments + ", sentinel=" + this.sentinel + ", checkpointed=" + this.checkpointed + '}';
        }

        public Supplier<Committer> getCommitterSupplier(final String topic, final Map<Integer, Long> lastPersistedOffsets) {
            return () -> new Committer(){

                public Object getMetadata() {
                    Preconditions.checkState((boolean)assignments.isEmpty(), (String)"This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", (Object[])new Object[]{endOffsets});
                    for (Map.Entry partitionOffset : endOffsets.entrySet()) {
                        lastPersistedOffsets.put(partitionOffset.getKey(), Math.max((Long)partitionOffset.getValue(), lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)));
                    }
                    return ImmutableMap.of((Object)KafkaIndexTask.METADATA_NEXT_PARTITIONS, (Object)new KafkaPartitions(topic, lastPersistedOffsets), (Object)KafkaIndexTask.METADATA_PUBLISH_PARTITIONS, (Object)new KafkaPartitions(topic, endOffsets));
                }

                public void run() {
                }
            };
        }

        public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction) {
            return (segments, commitMetadata) -> {
                KafkaPartitions finalPartitions = (KafkaPartitions)toolbox.getObjectMapper().convertValue(((Map)Preconditions.checkNotNull((Object)commitMetadata, (Object)"commitMetadata")).get(KafkaIndexTask.METADATA_PUBLISH_PARTITIONS), KafkaPartitions.class);
                if (!this.getEndOffsets().equals(finalPartitions.getPartitionOffsetMap())) {
                    throw new ISE("WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", new Object[]{this.toString(), commitMetadata});
                }
                SegmentTransactionalInsertAction action = useTransaction ? new SegmentTransactionalInsertAction(segments, (DataSourceMetadata)new KafkaDataSourceMetadata(new KafkaPartitions(finalPartitions.getTopic(), this.getStartOffsets())), (DataSourceMetadata)new KafkaDataSourceMetadata(finalPartitions)) : new SegmentTransactionalInsertAction(segments, null, null);
                log.info("Publishing with isTransaction[%s].", new Object[]{useTransaction});
                return ((SegmentPublishResult)toolbox.getTaskActionClient().submit((TaskAction)action)).isSuccess();
            };
        }
    }

    public static enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING;

    }
}

