/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadDataProvider;
import org.apache.kafka.streams.processor.internals.ThreadMetadataProvider;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class StreamThread
extends Thread
implements ThreadDataProvider {
    private final Logger log;
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    private final Time time;
    private final long pollTimeMs;
    private final long commitTimeMs;
    private final Object stateLock;
    private final UUID processId;
    private final String clientId;
    private final String logPrefix;
    private final StreamsConfig config;
    private final TaskManager taskManager;
    private final StateDirectory stateDirectory;
    private final PartitionGrouper partitionGrouper;
    private final StreamsMetricsThreadImpl streamsMetrics;
    private final StreamsMetadataState streamsMetadataState;
    private long lastCommitMs;
    private long timerStartedMs;
    private String originalReset;
    private Throwable rebalanceException = null;
    private boolean processStandbyRecords = false;
    private volatile State state = State.CREATED;
    private StateListener stateListener;
    private ThreadMetadataProvider metadataProvider;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    final ConsumerRebalanceListener rebalanceListener;
    final Consumer<byte[], byte[]> restoreConsumer;
    protected final Consumer<byte[], byte[]> consumer;
    protected final InternalTopologyBuilder builder;
    public final String applicationId;
    private volatile ThreadMetadata threadMetadata;
    private static final int UNLIMITED_RECORDS = -1;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    public State state() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean setState(State newState) {
        State oldState = this.state;
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                return false;
            }
            if (this.state == State.DEAD) {
                return false;
            }
            if (this.state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
                return false;
            }
            if (!this.state.isValidTransition(newState)) {
                this.log.error("Unexpected state transition from {} to {}", (Object)oldState, (Object)newState);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
            if (newState == State.RUNNING) {
                this.updateThreadMetadata(this.taskManager.activeTasks(), this.taskManager.standbyTasks());
            } else {
                this.updateThreadMetadata(null, null);
            }
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
        return true;
    }

    public boolean isRunningAndNotRebalancing() {
        return this.state == State.RUNNING;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state == State.RUNNING || this.state == State.PARTITIONS_REVOKED || this.state == State.PARTITIONS_ASSIGNED;
        }
    }

    public StreamThread(InternalTopologyBuilder builder, String clientId, String threadClientId, StreamsConfig config, UUID processId, Time time, StreamsMetadataState streamsMetadataState, TaskManager taskManager, StreamsMetricsThreadImpl streamsMetrics, KafkaClientSupplier clientSupplier, Consumer<byte[], byte[]> restoreConsumer, StateDirectory stateDirectory) {
        super(threadClientId);
        this.builder = builder;
        this.clientId = clientId;
        this.applicationId = config.getString("application.id");
        this.pollTimeMs = config.getLong("poll.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.processId = processId;
        this.time = time;
        this.streamsMetadataState = streamsMetadataState;
        this.taskManager = taskManager;
        this.logPrefix = String.format("stream-thread [%s] ", threadClientId);
        this.streamsMetrics = streamsMetrics;
        this.restoreConsumer = restoreConsumer;
        this.stateDirectory = stateDirectory;
        this.config = config;
        this.stateLock = new Object();
        this.standbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
        this.partitionGrouper = (PartitionGrouper)config.getConfiguredInstance("partition.grouper", PartitionGrouper.class);
        LogContext logContext = new LogContext(this.logPrefix);
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
        this.log.info("Creating consumer client");
        Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, this.applicationId, threadClientId);
        if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
            this.originalReset = (String)consumerConfigs.get("auto.offset.reset");
            this.log.info("Custom offset resets specified updating configs original auto offset reset {}", (Object)this.originalReset);
            consumerConfigs.put("auto.offset.reset", "none");
        }
        this.consumer = clientSupplier.getConsumer(consumerConfigs);
        taskManager.setConsumer(this.consumer);
        this.updateThreadMetadata(null, null);
    }

    public static StreamThread create(InternalTopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, UUID processId, String clientId, Metrics metrics, Time time, StreamsMetadataState streamsMetadataState, long cacheSizeBytes, StateDirectory stateDirectory, StateRestoreListener userStateRestoreListener) {
        String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
        StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId, Collections.singletonMap("client-id", threadClientId));
        String logPrefix = String.format("stream-thread [%s] ", threadClientId);
        LogContext logContext = new LogContext(logPrefix);
        Logger log = logContext.logger(StreamThread.class);
        if (config.getLong("cache.max.bytes.buffering") < 0L) {
            log.warn("Negative cache size passed in thread. Reverting to cache size of 0 bytes");
        }
        ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
        boolean eosEnabled = "exactly_once".equals(config.getString("processing.guarantee"));
        log.info("Creating restore consumer client");
        Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
        Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
        StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
        Producer<byte[], byte[]> threadProducer = null;
        if (!eosEnabled) {
            Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
            log.info("Creating shared producer client");
            threadProducer = clientSupplier.getProducer(producerConfigs);
        }
        TaskCreator activeTaskCreator = new TaskCreator(builder, config, streamsMetrics, stateDirectory, streamsMetrics.taskCreatedSensor, changelogReader, cache, time, clientSupplier, threadProducer, threadClientId, log);
        StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder, config, streamsMetrics, stateDirectory, streamsMetrics.taskCreatedSensor, changelogReader, time, log);
        TaskManager taskManager = new TaskManager(changelogReader, logPrefix, restoreConsumer, activeTaskCreator, standbyTaskCreator, new AssignedTasks(logContext, "stream task"), new AssignedTasks(logContext, "standby task"));
        return new StreamThread(builder, clientId, threadClientId, config, processId, time, streamsMetadataState, taskManager, streamsMetrics, clientSupplier, restoreConsumer, stateDirectory);
    }

    @Override
    public void run() {
        this.log.info("Starting");
        this.setState(State.RUNNING);
        boolean cleanRun = false;
        try {
            this.runLoop();
            cleanRun = true;
        }
        catch (KafkaException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("Encountered the following error during processing:", (Throwable)e);
            throw e;
        }
        finally {
            this.completeShutdown(cleanRun);
        }
    }

    void setRebalanceException(Throwable rebalanceException) {
        this.rebalanceException = rebalanceException;
    }

    private void runLoop() {
        long recordsProcessedBeforeCommit = -1L;
        this.consumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
        while (this.isRunning()) {
            try {
                recordsProcessedBeforeCommit = this.runOnce(recordsProcessedBeforeCommit);
            }
            catch (TaskMigratedException ignoreAndRejoinGroup) {
                this.log.warn("Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.", (Throwable)((Object)ignoreAndRejoinGroup));
            }
        }
    }

    long runOnce(long recordsProcessedBeforeCommit) {
        ConsumerRecords<byte[], byte[]> records;
        long processedBeforeCommit = recordsProcessedBeforeCommit;
        this.timerStartedMs = this.time.milliseconds();
        if (this.state == State.PARTITIONS_ASSIGNED) {
            records = this.pollRequests(0L);
            if (this.taskManager.updateNewAndRestoringTasks()) {
                this.setState(State.RUNNING);
            }
        } else {
            records = this.pollRequests(this.pollTimeMs);
            if (this.state == State.PARTITIONS_ASSIGNED && this.taskManager.updateNewAndRestoringTasks()) {
                this.setState(State.RUNNING);
            }
        }
        if (records != null && !records.isEmpty() && this.taskManager.hasActiveRunningTasks()) {
            this.streamsMetrics.pollTimeSensor.record((double)this.computeLatency(), this.timerStartedMs);
            this.addRecordsToTasks(records);
            long totalProcessed = this.processAndMaybeCommit(recordsProcessedBeforeCommit);
            if (totalProcessed > 0L) {
                long processLatency = this.computeLatency();
                this.streamsMetrics.processTimeSensor.record((double)processLatency / (double)totalProcessed, this.timerStartedMs);
                processedBeforeCommit = this.adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed, processLatency, this.commitTimeMs);
            }
        }
        this.punctuate();
        this.maybeCommit(this.timerStartedMs);
        this.maybeUpdateStandbyTasks(this.timerStartedMs);
        return processedBeforeCommit;
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(long pollTimeMs) {
        ConsumerRecords records = null;
        try {
            records = this.consumer.poll(pollTimeMs);
        }
        catch (InvalidOffsetException e) {
            this.resetInvalidOffsets(e);
        }
        if (this.rebalanceException != null) {
            if (this.rebalanceException instanceof TaskMigratedException) {
                throw (TaskMigratedException)((Object)this.rebalanceException);
            }
            throw new StreamsException(this.logPrefix + "Failed to rebalance.", this.rebalanceException);
        }
        return records;
    }

    private void resetInvalidOffsets(InvalidOffsetException e) {
        Set partitions = e.partitions();
        HashSet<String> loggedTopics = new HashSet<String>();
        HashSet<TopicPartition> seekToBeginning = new HashSet<TopicPartition>();
        HashSet<TopicPartition> seekToEnd = new HashSet<TopicPartition>();
        for (TopicPartition partition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
                continue;
            }
            if (this.builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
                continue;
            }
            if (this.originalReset == null || !this.originalReset.equals("earliest") && !this.originalReset.equals("latest")) {
                String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))";
                throw new StreamsException(String.format("No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", partition.topic(), partition.partition()), e);
            }
            if (this.originalReset.equals("earliest")) {
                this.addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                continue;
            }
            if (!this.originalReset.equals("latest")) continue;
            this.addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
        }
        if (!seekToBeginning.isEmpty()) {
            this.consumer.seekToBeginning(seekToBeginning);
        }
        if (!seekToEnd.isEmpty()) {
            this.consumer.seekToEnd(seekToEnd);
        }
    }

    private void addToResetList(TopicPartition partition, Set<TopicPartition> partitions, String logMessage, String resetPolicy, Set<String> loggedTopics) {
        String topic = partition.topic();
        if (loggedTopics.add(topic)) {
            this.log.info(logMessage, (Object)topic, (Object)resetPolicy);
        }
        partitions.add(partition);
    }

    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
        if (records != null && !records.isEmpty()) {
            int numAddedRecords = 0;
            for (TopicPartition partition : records.partitions()) {
                Task task = this.taskManager.activeTask(partition);
                numAddedRecords += task.addRecords(partition, records.records(partition));
            }
            this.streamsMetrics.skippedRecordsSensor.record((double)(records.count() - numAddedRecords), this.timerStartedMs);
        }
    }

    private long processAndMaybeCommit(long recordsProcessedBeforeCommit) {
        long processed;
        long totalProcessedSinceLastMaybeCommit = 0L;
        do {
            int committed;
            if ((processed = (long)this.taskManager.process()) > 0L) {
                this.streamsMetrics.processTimeSensor.record((double)this.computeLatency() / (double)processed, this.timerStartedMs);
            }
            this.punctuate();
            if (recordsProcessedBeforeCommit != -1L && (totalProcessedSinceLastMaybeCommit += processed) >= recordsProcessedBeforeCommit) {
                totalProcessedSinceLastMaybeCommit = 0L;
                this.maybeCommit(this.timerStartedMs);
            }
            if ((committed = this.taskManager.maybeCommitActiveTasks()) <= 0) continue;
            this.streamsMetrics.commitTimeSensor.record((double)this.computeLatency() / (double)committed, this.timerStartedMs);
        } while (processed != 0L);
        return totalProcessedSinceLastMaybeCommit;
    }

    private void punctuate() {
        int punctuated = this.taskManager.punctuate();
        if (punctuated > 0) {
            this.streamsMetrics.punctuateTimeSensor.record((double)this.computeLatency() / (double)punctuated, this.timerStartedMs);
        }
    }

    private long adjustRecordsProcessedBeforeCommit(long prevRecordsProcessedBeforeCommit, long totalProcessed, long processLatency, long commitTime) {
        long recordsProcessedBeforeCommit = -1L;
        if (processLatency > 0L && processLatency > commitTime) {
            recordsProcessedBeforeCommit = Math.max(1L, commitTime * totalProcessed / processLatency);
            this.log.debug("processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", new Object[]{processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit});
        } else if (prevRecordsProcessedBeforeCommit != -1L && processLatency > 0L) {
            recordsProcessedBeforeCommit = Math.max(1L, commitTime * totalProcessed / processLatency);
            this.log.debug("processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", new Object[]{processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit});
        }
        return recordsProcessedBeforeCommit;
    }

    void maybeCommit(long now) {
        if (this.commitTimeMs >= 0L && this.lastCommitMs + this.commitTimeMs < now) {
            int committed;
            if (this.log.isTraceEnabled()) {
                this.log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), now - this.lastCommitMs, this.commitTimeMs});
            }
            if ((committed = this.taskManager.commitAll()) > 0) {
                this.streamsMetrics.commitTimeSensor.record((double)this.computeLatency() / (double)committed, this.timerStartedMs);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.timerStartedMs - now});
            }
            this.lastCommitMs = now;
            this.processStandbyRecords = true;
        }
    }

    private void maybeUpdateStandbyTasks(long now) {
        if (this.state == State.RUNNING && this.taskManager.hasStandbyRunningTasks()) {
            ConsumerRecords records;
            List<ConsumerRecord<byte[], byte[]>> remaining;
            if (this.processStandbyRecords) {
                if (!this.standbyRecords.isEmpty()) {
                    HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
                    for (Map.Entry entry : this.standbyRecords.entrySet()) {
                        TopicPartition partition = (TopicPartition)entry.getKey();
                        remaining = (List<ConsumerRecord<byte[], byte[]>>)entry.getValue();
                        if (remaining == null) continue;
                        Task task = this.taskManager.standbyTask(partition);
                        remaining = task.update(partition, remaining);
                        if (remaining != null) {
                            remainingStandbyRecords.put(partition, remaining);
                            continue;
                        }
                        this.restoreConsumer.resume(Collections.singleton(partition));
                    }
                    this.standbyRecords = remainingStandbyRecords;
                    this.log.debug("Updated standby tasks {} in {}ms", this.taskManager.standbyTaskIds(), (Object)(this.time.milliseconds() - now));
                }
                this.processStandbyRecords = false;
            }
            if (!(records = this.restoreConsumer.poll(0L)).isEmpty()) {
                for (TopicPartition topicPartition : records.partitions()) {
                    Task task = this.taskManager.standbyTask(topicPartition);
                    if (task == null) {
                        throw new StreamsException(this.logPrefix + "Missing standby task for partition " + topicPartition);
                    }
                    remaining = task.update(topicPartition, records.records(topicPartition));
                    if (remaining == null) continue;
                    this.restoreConsumer.pause(Collections.singleton(topicPartition));
                    this.standbyRecords.put(topicPartition, remaining);
                }
            }
        }
    }

    private long computeLatency() {
        long previousTimeMs = this.timerStartedMs;
        this.timerStartedMs = this.time.milliseconds();
        return Math.max(this.timerStartedMs - previousTimeMs, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        this.setState(State.PENDING_SHUTDOWN);
    }

    public Map<TaskId, Task> tasks() {
        return this.taskManager.activeTasks();
    }

    @Override
    public Set<TaskId> prevActiveTasks() {
        return this.taskManager.prevActiveTaskIds();
    }

    @Override
    public InternalTopologyBuilder builder() {
        return this.builder;
    }

    @Override
    public String name() {
        return this.getName();
    }

    @Override
    public Set<TaskId> cachedTasks() {
        HashSet<TaskId> tasks = new HashSet<TaskId>();
        File[] stateDirs = this.stateDirectory.listTaskDirectories();
        if (stateDirs != null) {
            for (File dir : stateDirs) {
                try {
                    TaskId id = TaskId.parse(dir.getName());
                    if (!new File(dir, ".checkpoint").exists()) continue;
                    tasks.add(id);
                }
                catch (TaskIdFormatException taskIdFormatException) {
                    // empty catch block
                }
            }
        }
        return tasks;
    }

    @Override
    public UUID processId() {
        return this.processId;
    }

    @Override
    public StreamsConfig config() {
        return this.config;
    }

    @Override
    public PartitionGrouper partitionGrouper() {
        return this.partitionGrouper;
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder().append(indent).append("StreamsThread appId: ").append(this.applicationId).append("\n").append(indent).append("\tStreamsThread clientId: ").append(this.clientId).append("\n").append(indent).append("\tStreamsThread threadId: ").append(this.getName()).append("\n");
        sb.append(this.taskManager.toString(indent));
        return sb.toString();
    }

    String threadClientId() {
        return this.getName();
    }

    @Override
    public void setThreadMetadataProvider(ThreadMetadataProvider metadataProvider) {
        this.metadataProvider = metadataProvider;
        this.taskManager.setThreadMetadataProvider(metadataProvider);
    }

    private void completeShutdown(boolean cleanRun) {
        this.setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        this.taskManager.shutdown(cleanRun);
        try {
            this.consumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close consumer due to the following error:", e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close restore consumer due to the following error:", e);
        }
        this.streamsMetrics.removeAllSensors();
        this.setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    private void clearStandbyRecords() {
        this.standbyRecords.clear();
    }

    private void refreshMetadataState() {
        this.streamsMetadataState.onChange(this.metadataProvider.getPartitionsByHostState(), this.metadataProvider.clusterMetadata());
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    private void updateThreadMetadata(Map<TaskId, Task> activeTasks, Map<TaskId, Task> standbyTasks) {
        HashSet<TaskMetadata> activeTasksMetadata = new HashSet<TaskMetadata>();
        if (activeTasks != null) {
            for (Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
                activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
            }
        }
        HashSet<TaskMetadata> standbyTasksMetadata = new HashSet<TaskMetadata>();
        if (standbyTasks != null) {
            for (Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
                standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
            }
        }
        this.threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), activeTasksMetadata, standbyTasksMetadata);
    }

    static class StreamsMetricsThreadImpl
    extends StreamsMetricsImpl {
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreatedSensor;
        final Sensor tasksClosedSensor;
        final Sensor skippedRecordsSensor;

        StreamsMetricsThreadImpl(Metrics metrics, String groupName, String prefix, Map<String, String> tags) {
            super(metrics, groupName, tags);
            this.commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
            this.commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), (MeasurableStat)new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), (MeasurableStat)new Max());
            this.commitTimeSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "commit", "commit calls"));
            this.pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
            this.pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), (MeasurableStat)new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), (MeasurableStat)new Max());
            this.pollTimeSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "poll", "record-poll calls"));
            this.processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
            this.processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), (MeasurableStat)new Avg());
            this.processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), (MeasurableStat)new Max());
            this.processTimeSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "process", "process calls"));
            this.punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), (MeasurableStat)new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), (MeasurableStat)new Max());
            this.punctuateTimeSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "punctuate", "punctuate calls"));
            this.taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
            this.taskCreatedSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "task-created", "newly created tasks"));
            this.tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
            this.tasksClosedSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Count(), "task-closed", "closed tasks"));
            this.skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
            this.skippedRecordsSensor.add((CompoundStat)this.createMeter(metrics, (SampledStat)new Sum(), "skipped-records", "skipped records"));
        }

        private Meter createMeter(Metrics metrics, SampledStat stat, String baseName, String descriptiveName) {
            MetricName rateMetricName = metrics.metricName(baseName + "-rate", this.groupName, String.format("The average per-second number of %s", descriptiveName), this.tags);
            MetricName totalMetricName = metrics.metricName(baseName + "-total", this.groupName, String.format("The total number of %s", descriptiveName), this.tags);
            return new Meter(stat, rateMetricName, totalMetricName);
        }

        void removeAllSensors() {
            this.removeSensor(this.commitTimeSensor);
            this.removeSensor(this.pollTimeSensor);
            this.removeSensor(this.processTimeSensor);
            this.removeSensor(this.punctuateTimeSensor);
            this.removeSensor(this.taskCreatedSensor);
            this.removeSensor(this.tasksClosedSensor);
            this.removeSensor(this.skippedRecordsSensor);
        }
    }

    static class StandbyTaskCreator
    extends AbstractTaskCreator {
        StandbyTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetrics streamsMetrics, StateDirectory stateDirectory, Sensor taskCreatedSensor, ChangelogReader storeChangelogReader, Time time, Logger log) {
            super(builder, config, streamsMetrics, stateDirectory, taskCreatedSensor, storeChangelogReader, time, log);
        }

        @Override
        StandbyTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> partitions) {
            this.taskCreatedSensor.record();
            ProcessorTopology topology = this.builder.build(taskId.topicGroupId);
            if (!topology.stateStores().isEmpty()) {
                return new StandbyTask(taskId, this.applicationId, partitions, topology, consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory);
            }
            this.log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", (Object)taskId, partitions);
            return null;
        }
    }

    static class TaskCreator
    extends AbstractTaskCreator {
        private final ThreadCache cache;
        private final KafkaClientSupplier clientSupplier;
        private final String threadClientId;
        private final Producer<byte[], byte[]> threadProducer;

        TaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetrics streamsMetrics, StateDirectory stateDirectory, Sensor taskCreatedSensor, ChangelogReader storeChangelogReader, ThreadCache cache, Time time, KafkaClientSupplier clientSupplier, Producer<byte[], byte[]> threadProducer, String threadClientId, Logger log) {
            super(builder, config, streamsMetrics, stateDirectory, taskCreatedSensor, storeChangelogReader, time, log);
            this.cache = cache;
            this.clientSupplier = clientSupplier;
            this.threadProducer = threadProducer;
            this.threadClientId = threadClientId;
        }

        @Override
        StreamTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> partitions) {
            this.taskCreatedSensor.record();
            return new StreamTask(taskId, this.applicationId, partitions, this.builder.build(taskId.topicGroupId), consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.createProducer(taskId));
        }

        private Producer<byte[], byte[]> createProducer(TaskId id) {
            if (this.threadProducer == null) {
                Map<String, Object> producerConfigs = this.config.getProducerConfigs(this.threadClientId + "-" + id);
                this.log.info("Creating producer client for task {}", (Object)id);
                producerConfigs.put("transactional.id", this.applicationId + "-" + id);
                return this.clientSupplier.getProducer(producerConfigs);
            }
            return this.threadProducer;
        }

        @Override
        public void close() {
            if (this.threadProducer != null) {
                try {
                    this.threadProducer.close();
                }
                catch (Throwable e) {
                    this.log.error("Failed to close producer due to the following error:", e);
                }
            }
        }
    }

    static abstract class AbstractTaskCreator {
        final String applicationId;
        final InternalTopologyBuilder builder;
        final StreamsConfig config;
        final StreamsMetrics streamsMetrics;
        final StateDirectory stateDirectory;
        final Sensor taskCreatedSensor;
        final ChangelogReader storeChangelogReader;
        final Time time;
        final Logger log;

        AbstractTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetrics streamsMetrics, StateDirectory stateDirectory, Sensor taskCreatedSensor, ChangelogReader storeChangelogReader, Time time, Logger log) {
            this.applicationId = config.getString("application.id");
            this.builder = builder;
            this.config = config;
            this.streamsMetrics = streamsMetrics;
            this.stateDirectory = stateDirectory;
            this.taskCreatedSensor = taskCreatedSensor;
            this.storeChangelogReader = storeChangelogReader;
            this.time = time;
            this.log = log;
        }

        Collection<Task> createTasks(Consumer<byte[], byte[]> consumer, Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
            ArrayList<Task> createdTasks = new ArrayList<Task>();
            for (Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
                Set<TopicPartition> partitions;
                TaskId taskId = newTaskAndPartitions.getKey();
                Task task = this.createTask(consumer, taskId, partitions = newTaskAndPartitions.getValue());
                if (task == null) continue;
                this.log.trace("Created task {} with assigned partitions {}", (Object)taskId, partitions);
                createdTasks.add(task);
            }
            return createdTasks;
        }

        abstract Task createTask(Consumer<byte[], byte[]> var1, TaskId var2, Set<TopicPartition> var3);

        public void close() {
        }
    }

    static class RebalanceListener
    implements ConsumerRebalanceListener {
        private final Time time;
        private final TaskManager taskManager;
        private final StreamThread streamThread;
        private final Logger log;

        RebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger log) {
            this.time = time;
            this.taskManager = taskManager;
            this.streamThread = streamThread;
            this.log = log;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
            long start;
            block5: {
                this.log.debug("at state {}: partitions {} assigned at the end of consumer rebalance.\n\tcurrent suspended active tasks: {}\n\tcurrent suspended standby tasks: {}\n", new Object[]{this.streamThread.state, assignment, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                start = this.time.milliseconds();
                if (this.streamThread.setState(State.PARTITIONS_ASSIGNED)) break block5;
                this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
                return;
            }
            try {
                this.taskManager.createTasks(assignment);
                this.streamThread.refreshMetadataState();
            }
            catch (Throwable t) {
                try {
                    this.log.error("Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}", (Object)t.getMessage());
                    this.streamThread.setRebalanceException(t);
                }
                catch (Throwable throwable) {
                    this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
                    throw throwable;
                }
                this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
            }
            this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
            this.log.debug("at state {}: partitions {} revoked at the beginning of consumer rebalance.\n\tcurrent assigned active tasks: {}\n\tcurrent assigned standby tasks: {}\n", new Object[]{this.streamThread.state, assignment, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
            if (this.streamThread.setState(State.PARTITIONS_REVOKED)) {
                long start = this.time.milliseconds();
                try {
                    this.taskManager.suspendTasksAndState();
                }
                catch (Throwable t) {
                    try {
                        this.log.error("Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance: {}", (Object)t.getMessage());
                        this.streamThread.setRebalanceException(t);
                    }
                    catch (Throwable throwable) {
                        this.streamThread.refreshMetadataState();
                        this.streamThread.clearStandbyRecords();
                        this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                        throw throwable;
                    }
                    this.streamThread.refreshMetadataState();
                    this.streamThread.clearStandbyRecords();
                    this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                }
                this.streamThread.refreshMetadataState();
                this.streamThread.clearStandbyRecords();
                this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
            }
        }
    }

    public static interface StateListener {
        public void onChange(Thread var1, ThreadStateTransitionValidator var2, ThreadStateTransitionValidator var3);
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 4),
        RUNNING(2, 4),
        PARTITIONS_REVOKED(3, 4),
        PARTITIONS_ASSIGNED(1, 2, 4),
        PENDING_SHUTDOWN(5),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return this.equals(RUNNING) || this.equals(PARTITIONS_REVOKED) || this.equals(PARTITIONS_ASSIGNED);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }
}

