/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

public class StreamTask
extends AbstractTask
implements ProcessorNodePunctuator {
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord("__null_topic__", -1, -1L, null, null);
    private final PartitionGroup partitionGroup;
    private final PartitionGroup.RecordInfo recordInfo;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final RecordCollector recordCollector;
    private final Producer<byte[], byte[]> producer;
    private final int maxBufferedSize;
    private boolean commitRequested = false;
    private boolean commitOffsetNeeded = false;
    private boolean transactionInFlight = false;
    private final Time time;
    private final TaskMetrics metrics;

    public StreamTask(TaskId id, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory, ThreadCache cache, Time time, Producer<byte[], byte[]> producer) {
        super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
        this.time = time;
        this.producer = producer;
        this.metrics = new TaskMetrics(metrics);
        ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
        this.recordCollector = this.createRecordCollector(this.logContext, productionExceptionHandler);
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxBufferedSize = config.getInt("buffered.records.per.partition");
        this.consumedOffsets = new HashMap<TopicPartition, Long>();
        HashMap<TopicPartition, RecordQueue> partitionQueues = new HashMap<TopicPartition, RecordQueue>();
        this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, this.stateMgr, metrics, cache);
        TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
        DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
        for (TopicPartition partition : partitions) {
            SourceNode source = topology.source(partition.topic());
            TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
            RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, this.processorContext, this.logContext);
            partitionQueues.put(partition, queue);
        }
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.partitionGroup = new PartitionGroup(partitionQueues);
        this.stateMgr.registerGlobalStateStores(topology.globalStateStores());
        if (this.eosEnabled) {
            this.producer.initTransactions();
        }
    }

    @Override
    public boolean initializeStateStores() {
        this.log.trace("Initializing state stores");
        this.registerStateStores();
        return this.changelogPartitions().isEmpty();
    }

    @Override
    public void initializeTopology() {
        this.initTopology();
        if (this.eosEnabled) {
            try {
                this.producer.beginTransaction();
            }
            catch (ProducerFencedException fatal) {
                throw new TaskMigratedException(this, (Throwable)fatal);
            }
            this.transactionInFlight = true;
        }
        this.processorContext.initialized();
        this.taskInitialized = true;
    }

    @Override
    public void resume() {
        this.log.debug("Resuming");
    }

    public boolean process() {
        StampedRecord record = this.partitionGroup.nextRecord(this.recordInfo);
        if (record == null) {
            return false;
        }
        try {
            ProcessorNode currNode = this.recordInfo.node();
            TopicPartition partition = this.recordInfo.partition();
            this.log.trace("Start processing one record [{}]", (Object)record);
            this.updateProcessorContext(record, currNode);
            currNode.process(record.key(), record.value());
            this.log.trace("Completed processing one record [{}]", (Object)record);
            this.consumedOffsets.put(partition, record.offset());
            this.commitOffsetNeeded = true;
            if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                this.consumer.resume(Collections.singleton(partition));
            }
        }
        catch (ProducerFencedException fatal) {
            throw new TaskMigratedException(this, (Throwable)fatal);
        }
        catch (KafkaException e) {
            throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", this.id(), this.processorContext.currentNode().name(), record.topic(), record.partition(), record.offset()), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
        return true;
    }

    @Override
    public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        this.updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", new Object[]{node.name(), timestamp, type});
        }
        try {
            node.punctuate(timestamp, punctuator);
        }
        catch (ProducerFencedException fatal) {
            throw new TaskMigratedException(this, (Throwable)fatal);
        }
        catch (KafkaException e) {
            throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, node.name()), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
    }

    private void updateProcessorContext(StampedRecord record, ProcessorNode currNode) {
        this.processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
        this.processorContext.setCurrentNode(currNode);
    }

    @Override
    public void commit() {
        this.commit(true);
    }

    void commit(final boolean startNewTransaction) {
        this.log.debug("Committing");
        this.metrics.metrics.measureLatencyNs(this.time, new Runnable(){

            @Override
            public void run() {
                StreamTask.this.flushState();
                if (!StreamTask.this.eosEnabled) {
                    StreamTask.this.stateMgr.checkpoint(StreamTask.this.recordCollectorOffsets());
                }
                StreamTask.this.commitOffsets(startNewTransaction);
            }
        }, this.metrics.taskCommitTimeSensor);
        this.commitRequested = false;
    }

    @Override
    protected Map<TopicPartition, Long> recordCollectorOffsets() {
        return this.recordCollector.offsets();
    }

    @Override
    protected void flushState() {
        this.log.trace("Flushing state and producer");
        super.flushState();
        try {
            this.recordCollector.flush();
        }
        catch (ProducerFencedException fatal) {
            throw new TaskMigratedException(this, (Throwable)fatal);
        }
    }

    private void commitOffsets(boolean startNewTransaction) {
        try {
            if (this.commitOffsetNeeded) {
                this.log.trace("Committing offsets");
                HashMap<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<TopicPartition, OffsetAndMetadata>(this.consumedOffsets.size());
                for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
                    TopicPartition partition = entry.getKey();
                    long offset = entry.getValue() + 1L;
                    consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
                    this.stateMgr.putOffsetLimit(partition, offset);
                }
                if (this.eosEnabled) {
                    this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
                    this.producer.commitTransaction();
                    this.transactionInFlight = false;
                    if (startNewTransaction) {
                        this.transactionInFlight = true;
                        this.producer.beginTransaction();
                    }
                } else {
                    this.consumer.commitSync(consumedOffsetsAndMetadata);
                }
                this.commitOffsetNeeded = false;
            } else if (this.eosEnabled && !startNewTransaction && this.transactionInFlight) {
                this.producer.commitTransaction();
                this.transactionInFlight = false;
            }
        }
        catch (CommitFailedException | ProducerFencedException fatal) {
            throw new TaskMigratedException(this, fatal);
        }
    }

    Map<TopicPartition, Long> purgableOffsets() {
        HashMap<TopicPartition, Long> purgableConsumedOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            if (!this.topology.isRepartitionTopic(tp.topic())) continue;
            purgableConsumedOffsets.put(tp, entry.getValue() + 1L);
        }
        return purgableConsumedOffsets;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode node : this.topology.processors()) {
            this.processorContext.setCurrentNode(node);
            try {
                node.init(this.processorContext);
            }
            finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    @Override
    public void suspend() {
        this.log.debug("Suspending");
        this.suspend(true);
    }

    void suspend(boolean clean) {
        this.closeTopology();
        if (clean) {
            this.commit(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeTopology() {
        this.log.trace("Closing processor topology");
        this.partitionGroup.clear();
        RuntimeException exception = null;
        if (this.taskInitialized) {
            for (ProcessorNode node : this.topology.processors()) {
                this.processorContext.setCurrentNode(node);
                try {
                    node.close();
                }
                catch (RuntimeException e) {
                    exception = e;
                }
                finally {
                    this.processorContext.setCurrentNode(null);
                }
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeSuspended(boolean clean, boolean isZombie, RuntimeException firstException) {
        try {
            this.closeStateManager(clean);
        }
        catch (RuntimeException e) {
            clean = false;
            if (firstException == null) {
                firstException = e;
            }
            this.log.error("Could not close state manager due to the following error:", (Throwable)e);
        }
        try {
            this.partitionGroup.close();
            this.metrics.removeAllSensors();
        }
        finally {
            if (this.eosEnabled) {
                if (!clean) {
                    try {
                        if (!isZombie) {
                            this.producer.abortTransaction();
                        }
                        this.transactionInFlight = false;
                    }
                    catch (ProducerFencedException e) {}
                }
                try {
                    if (!isZombie) {
                        this.recordCollector.close();
                    }
                }
                catch (Throwable e) {
                    this.log.error("Failed to close producer due to the following error:", e);
                }
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void close(boolean clean, boolean isZombie) {
        this.log.debug("Closing");
        RuntimeException firstException = null;
        try {
            this.suspend(clean);
        }
        catch (RuntimeException e) {
            clean = false;
            firstException = e;
            this.log.error("Could not close task due to the following error:", (Throwable)e);
        }
        this.closeSuspended(clean, isZombie, firstException);
    }

    public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        int oldQueueSize = this.partitionGroup.numBuffered(partition);
        int newQueueSize = this.partitionGroup.addRawRecords(partition, records);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", (Object)partition, (Object)newQueueSize);
        }
        if (newQueueSize > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(partition));
        }
        return newQueueSize - oldQueueSize;
    }

    public Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator) {
        switch (type) {
            case STREAM_TIME: {
                return this.schedule(0L, interval, type, punctuator);
            }
            case WALL_CLOCK_TIME: {
                return this.schedule(this.time.milliseconds() + interval, interval, type, punctuator);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    Cancellable schedule(long startTime, long interval, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule schedule = new PunctuationSchedule(this.processorContext.currentNode(), startTime, interval, punctuator);
        switch (type) {
            case STREAM_TIME: {
                return this.streamTimePunctuationQueue.schedule(schedule);
            }
            case WALL_CLOCK_TIME: {
                return this.systemTimePunctuationQueue.schedule(schedule);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    public boolean maybePunctuateStreamTime() {
        long timestamp = this.partitionGroup.timestamp();
        if (timestamp == -1L) {
            return false;
        }
        return this.streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this);
    }

    public boolean maybePunctuateSystemTime() {
        long timestamp = this.time.milliseconds();
        return this.systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
    }

    void needCommit() {
        this.commitRequested = true;
    }

    boolean commitNeeded() {
        return this.commitRequested;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    RecordCollector createRecordCollector(LogContext logContext, ProductionExceptionHandler productionExceptionHandler) {
        return new RecordCollectorImpl(this.producer, this.id.toString(), logContext, productionExceptionHandler);
    }

    protected class TaskMetrics {
        final StreamsMetricsImpl metrics;
        final Sensor taskCommitTimeSensor;

        TaskMetrics(StreamsMetrics metrics) {
            String name = StreamTask.this.id().toString();
            this.metrics = (StreamsMetricsImpl)metrics;
            this.taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, new String[0]);
        }

        void removeAllSensors() {
            this.metrics.removeSensor(this.taskCommitTimeSensor);
        }
    }
}

